graphrefly_core/node.rs
1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//! terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//! (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//! `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//! resubscribable terminal node resets lifecycle, except after TEARDOWN
26//! (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//! that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//! `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//! a nested wave (`WaveState::in_tick` is cleared before the
50//! deferred-fire phase).
51//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
52//! acquires + drops the state lock per fn-fire iteration around the
53//! `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
54//! etc. and run a nested wave.
55//! - **`BindingBoundary::custom_equals`** fires lock-released.
56//! `commit_emission` brackets the equals check around a lock release;
57//! custom equals oracles may re-enter Core safely.
58//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
59//! acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
60//! serialization), installs the sink under the state lock, drops the state
61//! lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
62//! `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
63//! A handshake-time sink callback may re-enter Core (`emit` / `complete` /
64//! `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
65//! transparently. Cross-thread emits block on `wave_owner` until the
66//! subscribe path drops it, preserving R1.3.5.a happens-after ordering.
67
68use std::collections::VecDeque;
69use std::panic::{catch_unwind, AssertUnwindSafe};
70use std::sync::{Arc, Weak};
71
72use ahash::{AHashMap as HashMap, AHashSet as HashSet};
73use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
74
75/// Held guard from `parking_lot::ReentrantMutex::lock_arc()` on a
76/// partition's `wave_owner`. `!Send` per `parking_lot::ReentrantMutex`'s
77/// thread-affinity contract (the inner guard is `!Send`; the wrapper
78/// inherits) — the type-level `!Send` flows into
79/// [`crate::batch::BatchGuard::_wave_guards`] so any attempt to send
80/// the batch guard across threads fails to compile.
81///
82/// **Phase H+ option (d) limited variant (2026-05-09):** the guard's
83/// [`Drop`] pops `sid` from the [`held_partitions`] thread-local so
84/// the ascending-order check on the next acquire sees the correct
85/// "currently held" set. Re-entrant acquires (same thread, same
86/// partition) increment a refcount in the thread-local; final drop
87/// removes the entry.
88///
89/// Slice Y1 / Phase E (2026-05-08); Phase H+ wrapper (2026-05-09).
90pub(crate) struct WaveOwnerGuard {
91 /// Drop order: the wrapper's Drop runs FIRST (pops `held_partitions`),
92 /// then `inner` drops automatically (releases the parking_lot lock).
93 /// Field-declaration order matters in Rust: the wrapper drops top-
94 /// down by default, so `inner` is listed AFTER `sid` so the wrapper's
95 /// custom Drop runs on the whole struct first, releasing the
96 /// thread-local entry under our control before the inner guard
97 /// hits parking_lot's release path.
98 sid: crate::subgraph::SubgraphId,
99 /// `#[allow(dead_code)]`: the inner guard is held to keep the
100 /// parking_lot::ReentrantMutex acquired for the wave's duration;
101 /// it's never read, only its `Drop` matters.
102 #[allow(dead_code)]
103 inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
104}
105
106impl Drop for WaveOwnerGuard {
107 fn drop(&mut self) {
108 // `held_partitions::release` returns `bool was_outermost`. The
109 // outermost-release signal is consumed by [`crate::batch::BatchGuard::drop`]
110 // for the per-thread `TIER3_EMITTED_THIS_WAVE` clear (D1 patch,
111 // 2026-05-09 — Slice G coalescing tracker is keyed by thread,
112 // not by partition, so the clear lives on `BatchGuard` not here).
113 // We discard the bool — no per-guard cleanup remains.
114 let _ = held_partitions::release(self.sid);
115 // `inner` drops automatically after this — releases the
116 // parking_lot::ReentrantMutex (decrementing parking_lot's
117 // own internal re-entry counter; only the FINAL release
118 // unparks waiters).
119 }
120}
121
122/// Phase H+ option (d) limited variant — thread-local infrastructure
123/// for cross-partition acquire-during-fire / cross-partition
124/// acquire-during-wave deadlock detection (extended /qa N1(a)
125/// 2026-05-09 to cover sink callbacks + any nested in-wave acquire).
126///
127/// **Protocol invariant enforced:** whenever this thread already
128/// holds at least one partition wave_owner (HELD non-empty), every
129/// NEW partition this thread tries to acquire must have an id
130/// strictly greater than every partition currently held. Re-entrant
131/// acquires (same thread, same partition that's already held) bypass
132/// the check (they're fine — `parking_lot::ReentrantMutex` allows
133/// same-thread re-entry transparently).
134///
135/// Without this check, two threads each doing nested cross-partition
136/// acquires within an active wave could form an AB/BA cycle: thread A
137/// holds X, attempts Y (Y < X); concurrently thread B holds Y,
138/// attempts X (X > Y, ascending-OK from B's POV). A's acquire on Y
139/// blocks behind B; B's acquire on X blocks behind A. Cycle.
140///
141/// **Single carve-out — producer build/project closures:** producer-
142/// pattern operator activation (`zip` / `concat` / `race` /
143/// `take_until` / `switch_map` / `exhaust_map` / `concat_map` /
144/// `merge_map`) runs build closures inside `BindingBoundary::invoke_fn`
145/// that legitimately subscribe to upstream sources cross-partition.
146/// Refactoring those operators to defer their inner subscribes to
147/// wave-end is the broader "Phase H+ STRICT variant" carry-forward
148/// (see `docs/porting-deferred.md`). For the limited variant, the
149/// thread-local [`IN_PRODUCER_BUILD`] refcount is incremented when a
150/// producer node enters its FiringGuard scope and the H+ check is
151/// suppressed for the duration. All other call paths (derived /
152/// dynamic user fns, sink callbacks, subscribe handshakes, drop
153/// cleanup) DO see the check.
154///
155/// **What this widening (/qa N1(a) 2026-05-09) catches that the
156/// original `fire_depth > 0` gate did NOT:**
157/// - Sink callbacks fired by `flush_notifications` lock-released
158/// AFTER drain — the outer wave's wave_owner is still held but
159/// no FiringGuard is on the stack. A sink that calls
160/// `core.subscribe(node_in_lower_partition, sink)` would have
161/// bypassed the original gate; now caught.
162/// - Subscribe-handshake-time cross-partition operations (similar
163/// shape — the handshake fires lock-released).
164/// - Any future re-entry path that doesn't go through `FiringGuard`
165/// (e.g., `Subscription::Drop` cleanup paths, R3.7-style graph
166/// destroy cascades).
167mod held_partitions {
168 use crate::subgraph::SubgraphId;
169 use smallvec::SmallVec;
170 use std::cell::RefCell;
171
172 /// Inline-storage capacity for the per-thread held-partitions set.
173 /// 4 mirrors the same inline limit used elsewhere in the codebase
174 /// (`compute_touched_partitions` returns `SmallVec<[SubgraphId; 4]>`,
175 /// `BatchGuard::_wave_guards` is `SmallVec<[WaveOwnerGuard; 4]>`).
176 /// In the typical wave a single thread holds 1–3 partitions; spillover
177 /// to the heap costs allocation but is correct.
178 const HELD_INLINE: usize = 4;
179
180 thread_local! {
181 /// Currently-held partitions on this thread, as `(SubgraphId, refcount)`
182 /// pairs in arbitrary order. The refcount mirrors
183 /// `parking_lot::ReentrantMutex`'s internal counter (we can't query
184 /// parking_lot's directly).
185 ///
186 /// `SmallVec<[_; HELD_INLINE]>` over `BTreeMap<_, _>`: under the
187 /// expected workload (≤4 partitions held simultaneously per wave) the
188 /// inline-storage SmallVec keeps the entire set in stack memory with
189 /// no allocation, no Box-per-node, and contiguous cache layout. Linear
190 /// scans are branch-predictable and faster than BTreeMap's logn
191 /// pointer-chasing for tiny N. Phase J post-widening bench
192 /// (`migration-status.md` 2026-05-09) reported 14–25% overhead vs the
193 /// pre-widening baseline, attributed in part to BTreeMap allocation
194 /// costs on the hot path. This swap recovers part of that overhead.
195 ///
196 /// Bookkeeping is unconditional — every acquire bumps the
197 /// refcount, every release decrements. The CHECK gate
198 /// (`!held.is_empty() && !in_producer_build()`) is what
199 /// distinguishes "first-time acquire on a fresh thread"
200 /// (allowed, held empty) from "nested acquire while we
201 /// already hold something" (must be ascending).
202 static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
203 = const { RefCell::new(SmallVec::new_const()) };
204
205 /// Per-thread "we're inside a producer build/project closure"
206 /// refcount. Producer-pattern operator activation increments
207 /// this on `FiringGuard::new`; their build closures call
208 /// cross-partition `Core::subscribe` legitimately during
209 /// activation (operator-internal setup, not user-fn re-entry).
210 /// The H+ check is suppressed while this counter is non-zero.
211 ///
212 /// Tracked as a refcount (not a bool) because nested producer
213 /// activations are theoretically possible (e.g., a producer
214 /// whose build closure subscribes to ANOTHER producer that
215 /// also activates) and must balance correctly. The increment
216 /// uses `checked_add(1)` so an unbounded recursion (a real
217 /// bug — protocol cascades are bounded by
218 /// `MAX_BATCH_DRAIN_ITERATIONS`) panics instead of silently
219 /// saturating and disabling the check.
220 ///
221 /// /qa N1(a) (2026-05-09): replaced the original `FIRE_DEPTH`
222 /// thread-local. The previous design gated the H+ check on
223 /// fire_depth > 0; this restricted coverage to the
224 /// FiringGuard-wrapped invoke_fn scope. The new design
225 /// inverts: gate on `held non-empty AND !in_producer_build`.
226 /// Catches sink-callback / handshake / drop-cleanup paths
227 /// the previous gate missed.
228 static IN_PRODUCER_BUILD: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
229 }
230
231 /// Increment the producer-build refcount. Called from
232 /// `FiringGuard::new` ONLY for `is_producer()` nodes, AND from
233 /// `ProducerCtx::subscribe_to`'s wrapped sink (so producer-
234 /// internal sink callbacks also suppress the H+ check).
235 ///
236 /// # Panics
237 ///
238 /// Panics if the per-thread refcount would overflow `u32::MAX`.
239 /// This indicates an unbounded recursion through producer build /
240 /// sink dispatch — a real bug. Safer to surface than to silently
241 /// saturate and disable the check or produce inverted Drop
242 /// semantics. The protocol's `MAX_BATCH_DRAIN_ITERATIONS` cap
243 /// makes this overflow unreachable in practice.
244 pub fn producer_build_enter() {
245 IN_PRODUCER_BUILD.with(|c| {
246 let next = c.get().checked_add(1).expect(
247 "in_producer_build refcount overflow — unbounded \
248 producer-build re-entrance. Should be bounded by the \
249 protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
250 );
251 c.set(next);
252 });
253 }
254
255 /// Decrement the producer-build refcount. Called from
256 /// `FiringGuard::drop` ONLY for guards that incremented in `new`,
257 /// AND from `ProducerSinkGuard::drop` in the producer-sink
258 /// wrapper. Saturates on underflow (would indicate Drop without
259 /// matching `new` — recovery via no-op is safer than panicking
260 /// in Drop).
261 pub fn producer_build_exit() {
262 IN_PRODUCER_BUILD.with(|c| c.set(c.get().saturating_sub(1)));
263 }
264
265 /// Currently inside a producer build/project closure on this thread?
266 fn in_producer_build() -> bool {
267 IN_PRODUCER_BUILD.with(|c| c.get() > 0)
268 }
269
270 /// Phase H+ check + bookkeeping. Called BEFORE acquiring the
271 /// partition's parking_lot::ReentrantMutex.
272 ///
273 /// Panics with a clear diagnostic if:
274 /// - HELD is non-empty (this thread already holds ≥1 partition),
275 /// - AND we're NOT inside a producer build closure,
276 /// - AND `sid` is NOT already held by this thread,
277 /// - AND `sid <= max(currently held)`.
278 ///
279 /// Otherwise: increments the refcount for `sid` (creating the
280 /// entry if needed) and returns. The caller MUST pair every
281 /// call with a [`release`] when the guard drops.
282 ///
283 /// **Important note on cross-thread vs same-thread:** this check
284 /// is a SAME-THREAD invariant — it catches a thread acquiring
285 /// out of order from itself. Cross-thread AB/BA cycles between
286 /// threads with disjoint same-thread acquisition orders are
287 /// prevented at a different layer (the
288 /// `compute_touched_partitions` upfront-acquire-all-ascending
289 /// rule in `Core::begin_batch_for`). This thread-local check
290 /// adds the layer that prevents a same-thread descending acquire
291 /// from creating the FIRST half of a cross-thread cycle.
292 pub(crate) fn check_and_acquire(sid: SubgraphId) {
293 HELD.with(|h| {
294 let mut held = h.borrow_mut();
295 // Gate: held non-empty (we're nested) AND not in producer
296 // build/sink (the v1 carve-out for operator activation +
297 // producer-internal sink callbacks). First-time acquires
298 // on a fresh thread (held empty) skip the check — there's
299 // nothing to compare against.
300 let already_held = held.iter().any(|(s, _)| *s == sid);
301 if !held.is_empty() && !in_producer_build() && !already_held {
302 // Linear-scan max over the inline storage (typical N ≤ 4).
303 // Branch-predictable and cache-local; no allocation.
304 if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
305 if sid <= max_held {
306 // Drop the borrow before panicking so unwind
307 // doesn't see a still-borrowed RefCell.
308 let new_id = sid;
309 drop(held);
310 panic!(
311 "Phase H+ ascending-order violation: thread tried \
312 to acquire partition {new_id:?} while already \
313 holding partition {max_held:?}. \
314 The same-thread cross-partition lock-acquisition \
315 protocol requires every NEW partition acquired \
316 while ANY partition is already held to have id \
317 strictly greater than every already-held \
318 partition; otherwise two threads doing \
319 reciprocal acquires can form an AB/BA deadlock.\n\
320 \n\
321 Note: this check is per-thread. A cross-thread \
322 AB/BA cycle between threads each obeying \
323 ascending order at the per-thread level is \
324 prevented at a different layer — the \
325 `compute_touched_partitions` upfront-acquire-\
326 all-ascending rule in `Core::begin_batch_for`.\n\
327 \n\
328 Common triggers (see docs/porting-deferred.md \
329 'Cross-partition acquire-during-fire deadlock'):\n\
330 - A user fn (derived / dynamic) that calls \
331 `Core::emit` / `complete` / `error` / `teardown` \
332 / `invalidate` mid-fire on a node in a partition \
333 with a smaller id than the firing node's.\n\
334 - A sink callback that calls `Core::subscribe` \
335 on a node in a smaller-id partition while the \
336 outer wave's wave_owner is still held.\n\
337 - A subscribe handshake (or Drop cleanup) that \
338 re-enters Core on a smaller-id partition.\n\
339 \n\
340 Fix: schedule the cross-partition operation \
341 OUTSIDE the wave (e.g., via a deferred queue \
342 applied at wave end) so the acquire happens at \
343 top-level rather than nested under a held \
344 partition; OR declare the cross-partition \
345 reachability upfront via `add_meta_companion` \
346 so the wave acquires both partitions ascending \
347 at top-level and the inner re-entry becomes a \
348 re-entrant acquire on a held partition."
349 );
350 }
351 }
352 }
353 // Bookkeeping: increment refcount. `checked_add(1)` so
354 // overflow surfaces (would indicate an unbounded
355 // re-entrance — a real bug). Linear find-or-push.
356 if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
357 *count = count.checked_add(1).expect(
358 "held_partitions refcount overflow — unbounded \
359 same-partition re-entrance. Should be bounded by the \
360 protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
361 );
362 } else {
363 held.push((sid, 1));
364 }
365 });
366 }
367
368 /// Decrement the refcount for `sid`; remove the entry if it
369 /// hits zero. Called from [`super::WaveOwnerGuard::drop`] AND
370 /// from the retry / panic paths in
371 /// [`super::Core::partition_wave_owner_lock_arc`] to ensure the
372 /// refcount stays balanced under all unwind / retry / exhaust
373 /// paths.
374 ///
375 /// Returns `true` iff this release brought the partition's
376 /// refcount on this thread to zero — i.e. this was the OUTERMOST
377 /// guard for `sid` on this thread. [`super::WaveOwnerGuard::drop`]
378 /// uses this signal to clear per-partition wave state (Q3) on
379 /// outermost release only; inner re-entrant guard drops must NOT
380 /// clear (a containing wave is still active and still holds
381 /// in-flight wave state). The `partition_wave_owner_lock_arc`
382 /// retry / panic paths ignore the return value because the
383 /// partition state hadn't been touched yet on those paths
384 /// (clearing would be a no-op anyway).
385 pub(crate) fn release(sid: SubgraphId) -> bool {
386 HELD.with(|h| {
387 let mut held = h.borrow_mut();
388 if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
389 let count = &mut held[idx].1;
390 // /qa A3 fix (2026-05-09): debug_assert the refcount is
391 // non-zero before decrement. A `release(sid)` on an
392 // entry with `count == 0` indicates a bookkeeping bug
393 // (a release without a matching `check_and_acquire`,
394 // or a logic error in caller), but the legacy
395 // saturating_sub silently returned `was_outermost=true`
396 // and removed the entry — masking the bug. Surface
397 // in dev/test builds; release builds preserve the
398 // saturating behavior.
399 debug_assert!(
400 *count > 0,
401 "held_partitions::release({sid:?}): refcount underflow — \
402 release without matching check_and_acquire (caller bug)"
403 );
404 *count = count.saturating_sub(1);
405 if *count == 0 {
406 // `swap_remove` is O(1) and order-irrelevant: the
407 // CHECK gate computes max via linear scan and does
408 // not depend on iteration order.
409 held.swap_remove(idx);
410 return true;
411 }
412 } else {
413 // /qa A3 fix (2026-05-09): same intent — release on a
414 // sid that's not in HELD is a bookkeeping bug. Surface
415 // in dev/test builds.
416 debug_assert!(
417 false,
418 "held_partitions::release({sid:?}): sid not in HELD — \
419 double-drop or stray release (caller bug)"
420 );
421 }
422 false
423 })
424 }
425
426 /// Test-only: read the current thread's held-partitions snapshot.
427 /// Used by post-panic regression assertions to verify the
428 /// thread-local stays clean even when the H+ check unwinds the
429 /// stack (so cargo's thread-reuse doesn't propagate corrupted
430 /// state to subsequent tests). `pub` (gated by
431 /// `cfg(any(test, debug_assertions))`) so integration tests
432 /// outside the crate can read it.
433 #[cfg(any(test, debug_assertions))]
434 #[must_use]
435 pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
436 // /qa A2 fix (2026-05-09): sort by SubgraphId so the snapshot
437 // returns ascending order — matches the BTreeMap-iteration
438 // contract that pre-/qa SmallVec swap consumers might rely on.
439 // Test consumers currently only assert `is_empty()`, but the
440 // ordered shape is the safer default for future tests that
441 // assert specific entries.
442 let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
443 v.sort_unstable_by_key(|(s, _)| *s);
444 v
445 }
446
447 /// Test-only: read the current thread's producer-build refcount.
448 /// Companion to [`held_snapshot_for_tests`] for verifying the
449 /// thread-local stays clean post-panic.
450 #[cfg(any(test, debug_assertions))]
451 #[must_use]
452 pub fn in_producer_build_for_tests() -> u32 {
453 IN_PRODUCER_BUILD.with(std::cell::Cell::get)
454 }
455}
456
457/// `pub` re-exports for the `graphrefly-operators` crate to wrap
458/// producer-internal sinks with the same `IN_PRODUCER_BUILD` flag
459/// the FiringGuard uses (per /qa N1(a) — operator sinks are
460/// operator-internal and SUPPRESS the H+ check, mirroring the
461/// activation-time carve-out). Not part of the v1 stable user API
462/// surface; intended for in-workspace consumers only. Phase H+
463/// STRICT variant (the producer-architecture refactor) will
464/// eliminate the need for this carve-out.
465pub use held_partitions::{producer_build_enter, producer_build_exit};
466
467/// Test-only re-exports for integration tests under
468/// `crates/graphrefly-core/tests/`. Gated `#[cfg(any(test, debug_assertions))]`
469/// so they don't leak into release builds. Public visibility is
470/// required because integration tests live outside the crate.
471#[cfg(any(test, debug_assertions))]
472pub use held_partitions::{held_snapshot_for_tests, in_producer_build_for_tests};
473use smallvec::SmallVec;
474use thiserror::Error;
475
476use crate::boundary::{BindingBoundary, CleanupTrigger};
477use crate::clock::monotonic_ns;
478use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
479use crate::message::Message;
480
481/// Terminal-lifecycle state — once set on a node, the node will not emit
482/// further DATA; per-dep slots on consumers also use this to track which
483/// upstreams have terminated (R1.3.4 / Lock 2.B).
484///
485/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
486/// retained when the variant is stored in a node's `terminal` slot or any
487/// consumer's `dep_terminals` slot; v1 does not release these (terminal
488/// state is one-shot at this layer; release happens on resubscribable
489/// terminal-lifecycle reset, a separate slice).
490#[derive(Copy, Clone, Debug, PartialEq, Eq)]
491pub enum TerminalKind {
492 Complete,
493 Error(HandleId),
494}
495
496/// Node kind discriminant — **derived metadata** computed from
497/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
498///
499/// Core no longer stores `kind` as a field; it's computed on demand from
500/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
501/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
502/// shape uniquely identifies the kind:
503///
504/// | deps | fn_id | op | is_dynamic | kind |
505/// |-----------|-------|------|-----------|----------|
506/// | empty | None | None | - | State |
507/// | empty | Some | None | - | Producer |
508/// | non-empty | Some | None | false | Derived |
509/// | non-empty | Some | None | true | Dynamic |
510/// | non-empty | None | Some | - | Operator |
511///
512/// Public API ([`Core::kind_of`]) derives this enum on each call. State
513/// nodes are ROM (cache survives deactivation); compute nodes
514/// (Derived / Dynamic / Operator) and producers are RAM.
515#[derive(Copy, Clone, Eq, PartialEq, Debug)]
516pub enum NodeKind {
517 /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
518 State,
519 /// Producer node: fn fires once on first subscribe. No deps;
520 /// emissions arrive via sinks the fn subscribes to (zip / concat /
521 /// race / takeUntil pattern). Slice D / D031.
522 Producer,
523 /// Derived node: fn fires on every dep change; all deps tracked.
524 Derived,
525 /// Dynamic node: fn declares which dep indices it actually read this run.
526 /// Untracked dep updates flow through cache but do NOT re-fire fn.
527 Dynamic,
528 /// Operator node: built-in dispatch path for transform / combine /
529 /// flow / resilience operators. The `OperatorOp` discriminant selects
530 /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
531 /// Core manages per-operator state via the generic `op_scratch` slot
532 /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
533 Operator(OperatorOp),
534}
535
536impl NodeKind {
537 /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
538 /// and Operator(Last) must intercept upstream COMPLETE so they can emit
539 /// their accumulator / buffered value before the cascade terminates them;
540 /// instead of cascading, terminate_node queues such children for fn-fire
541 /// so `fire_operator` can handle the terminal.
542 pub(crate) fn skips_auto_cascade(self) -> bool {
543 matches!(
544 self,
545 NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
546 )
547 }
548}
549
550/// Built-in operator discriminant. Selects the per-operator dispatch path
551/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
552/// carries the binding-side closure ids (and seed handle for stateful
553/// folders) needed for the wave-execution path; Core stores no user values
554/// itself per the handle-protocol cleaving plane.
555#[derive(Copy, Clone, Eq, PartialEq, Debug)]
556pub enum OperatorOp {
557 /// `map(source, project)` — element-wise transform. Calls
558 /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
559 /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
560 /// semantics — no equals substitution between batch entries).
561 Map { fn_id: FnId },
562 /// `filter(source, predicate)` — silent-drop selection (D012/D018).
563 /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
564 /// passing input verbatim. If zero pass on a wave that dirtied the
565 /// node, queues a single `RESOLVED` to settle (D018).
566 Filter { fn_id: FnId },
567 /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
568 /// `seed` is captured at registration; `acc` lives in
569 /// [`ScanState`](super::op_state::ScanState) inside
570 /// [`NodeRecord::op_scratch`] and persists across waves until
571 /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
572 /// &inputs) -> SmallVec<HandleId>` per fire.
573 Scan { fn_id: FnId, seed: HandleId },
574 /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
575 /// COMPLETE. Accumulates silently while source DATA flows; on
576 /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
577 /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
578 /// auto-cascade (see `NodeKind::skips_auto_cascade`).
579 Reduce { fn_id: FnId, seed: HandleId },
580 /// `distinctUntilChanged(source, equals)` — suppresses adjacent
581 /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
582 /// prev, current)` per input; emits non-equal items verbatim and
583 /// updates `prev`. If zero items pass on a wave that dirtied the node,
584 /// queues `RESOLVED` (matches Filter discipline).
585 DistinctUntilChanged { equals_fn_id: FnId },
586 /// `pairwise(source)` — emits `(prev, current)` pairs starting after
587 /// the second value. First value swallowed (sets `prev`). Calls
588 /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
589 /// produce the binding-side tuple handle.
590 Pairwise { fn_id: FnId },
591
592 // ----- Slice C-2: multi-dep combinators (D020) -----
593 /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
594 /// the latest handle per dep into a single tuple handle via
595 /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
596 /// (`partial: false` default) holds until all deps deliver real DATA
597 /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
598 Combine { pack_fn: FnId },
599
600 /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
601 /// (D021, Phase 10.5). Packs `[primary, secondary]` via
602 /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
603 /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
604 /// settles with RESOLVED (D018 pattern). First-run gate holds until
605 /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
606 /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
607 /// warmup, settles with RESOLVED (no stale pair).
608 WithLatestFrom { pack_fn: FnId },
609
610 /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
611 /// (D022). Zero FFI on fire: no transformation, no binding call.
612 /// Each dep's batch handles are retained and emitted individually.
613 /// COMPLETE cascades when all deps complete (R1.3.4.b).
614 Merge,
615
616 // ----- Slice C-3: flow operators (D024) -----
617 /// `take(source, count)` — emits the first `count` DATA values then
618 /// self-completes via `Core::complete`. Tracks `count_emitted` in
619 /// [`TakeState`](super::op_state::TakeState). When upstream completes
620 /// before `count` is reached, the standard auto-cascade propagates
621 /// COMPLETE. `count == 0` is allowed: the first fire emits zero
622 /// items then immediately self-completes (D027).
623 Take { count: u32 },
624
625 /// `skip(source, count)` — drops the first `count` DATA values; once
626 /// the threshold is crossed, subsequent DATAs pass through verbatim.
627 /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
628 /// On a wave where every input is still in the skip window, queues
629 /// DIRTY+RESOLVED to settle (D018 pattern).
630 Skip { count: u32 },
631
632 /// `takeWhile(source, predicate)` — emits while `predicate(input)`
633 /// holds; on the first `false`, emits any preceding passes then
634 /// self-completes via `Core::complete`. Reuses
635 /// [`BindingBoundary::predicate_each`] (D029); after the first
636 /// `false`, subsequent inputs in the same batch are dropped.
637 TakeWhile { fn_id: FnId },
638
639 /// `last(source)` / `last_with_default(source, default)` — buffers
640 /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
641 /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
642 /// factory (emits only `Complete` on empty stream), or a registered
643 /// default handle (emits `Data(default)` + `Complete` on empty
644 /// stream). Storage: [`LastState`](super::op_state::LastState) holds
645 /// `latest` (live buffer) and `default` (registration-time, stable).
646 /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
647 /// COMPLETE.
648 Last { default: HandleId },
649}
650
651/// Registration options for [`Core::register_operator`].
652///
653/// `equals` controls operator output dedup (R5.7 — defaults to identity).
654/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
655/// fires on first DATA from any dep when `true`; default `false` matches
656/// the gated derived discipline).
657#[derive(Copy, Clone, Debug)]
658pub struct OperatorOpts {
659 pub equals: EqualsMode,
660 pub partial: bool,
661}
662
663impl Default for OperatorOpts {
664 fn default() -> Self {
665 Self {
666 equals: EqualsMode::Identity,
667 partial: false,
668 }
669 }
670}
671
672/// Closure-form fn id OR typed operator discriminant — the two dispatch
673/// paths a node can use. State / passthrough nodes pass `None` to
674/// [`Core::register`] (no fn at all).
675#[derive(Copy, Clone, Debug)]
676pub enum NodeFnOrOp {
677 /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
678 /// Used for Derived / Dynamic / Producer.
679 Fn(FnId),
680 /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
681 /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
682 /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
683 Op(OperatorOp),
684}
685
686/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
687/// Slice F audit, 2026-05-07 — closed the Rust port gap).
688///
689/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
690/// |---|---|---|
691/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
692/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
693/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
694///
695/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
696/// source picks it up. Memory profile is O(1) per node (no buffer); the
697/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
698/// than the K mid-pause emissions verbatim.
699///
700/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
701/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
702/// observable so leaked pause-controllers cannot strand subscribers.
703#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
704pub enum PausableMode {
705 /// Suppress fn-fire while paused; fire once on RESUME if any dep
706 /// delivered DATA during the pause window. Canonical default.
707 #[default]
708 Default,
709 /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
710 /// RESUME. Use when subscribers need verbatim emit history (e.g. an
711 /// audit log, replay-on-reconnect bridge).
712 ResumeAll,
713 /// Dispatcher ignores PAUSE for this node — tier-3 flushes
714 /// immediately even while a lock is held. Use for nodes whose value
715 /// production is intrinsically pause-immune (telemetry counters,
716 /// monotonic timers).
717 Off,
718}
719
720/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
721/// here; per-kind specifics (deps, fn_or_op) live on
722/// [`NodeRegistration`].
723#[derive(Copy, Clone, Debug)]
724pub struct NodeOpts {
725 /// Initial cached value. Only valid for state nodes (no deps + no
726 /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
727 pub initial: HandleId,
728 /// Equality mode for outgoing emissions (R1.3.2). Defaults to
729 /// [`EqualsMode::Identity`].
730 pub equals: EqualsMode,
731 /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
732 /// soon as ANY dep delivers a real handle; when `false` (default),
733 /// the node holds until every dep has delivered.
734 pub partial: bool,
735 /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
736 /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
737 /// deps non-empty.
738 pub is_dynamic: bool,
739 /// Pause behavior mode (canonical §2.6). Default is
740 /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
741 pub pausable: PausableMode,
742 /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
743 /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
744 /// last N DATA emissions and replays them to late subscribers as part
745 /// of the per-tier handshake (between [`Message::Start`] and any
746 /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
747 /// (R2.6.5 explicit "DATA only").
748 pub replay_buffer: Option<usize>,
749}
750
751impl Default for NodeOpts {
752 fn default() -> Self {
753 Self {
754 initial: NO_HANDLE,
755 equals: EqualsMode::Identity,
756 partial: false,
757 is_dynamic: false,
758 pausable: PausableMode::Default,
759 replay_buffer: None,
760 }
761 }
762}
763
764/// Unified node-registration descriptor (D030, Slice D).
765///
766/// All node kinds (State / Producer / Derived / Dynamic / Operator)
767/// register through [`Core::register`] with a `NodeRegistration`. The
768/// kind is **derived from the field shape** of the registration —
769/// `(deps.is_empty(), fn_or_op variant)`:
770///
771/// | deps | fn_or_op | is_dynamic | resulting kind |
772/// |-----------|-----------|-----------|----------------|
773/// | empty | None | - | State |
774/// | empty | Some(Fn) | - | Producer |
775/// | non-empty | Some(Fn) | false | Derived |
776/// | non-empty | Some(Fn) | true | Dynamic |
777/// | non-empty | Some(Op) | - | Operator |
778///
779/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
780/// etc.) build a `NodeRegistration` and delegate.
781#[derive(Clone, Debug)]
782pub struct NodeRegistration {
783 /// Upstream deps in declaration order. Empty for state / producer.
784 pub deps: Vec<NodeId>,
785 /// Closure-form fn id or typed-op discriminant. `None` for state /
786 /// passthrough.
787 pub fn_or_op: Option<NodeFnOrOp>,
788 /// Cross-kind config knobs.
789 pub opts: NodeOpts,
790}
791
792/// Equality mode for a node's outgoing emissions.
793///
794/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
795/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
796/// (R1.3.2.b two-arg call when both sides are non-sentinel).
797#[derive(Copy, Clone, Debug)]
798pub enum EqualsMode {
799 Identity,
800 Custom(FnId),
801}
802
803/// Internal identifier for a single subscription. Allocated per
804/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
805/// consumed directly only by Core internals and the [`Subscription::Drop`]
806/// path.
807#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
808pub(crate) struct SubscriptionId(u64);
809
810/// RAII subscription handle.
811///
812/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
813/// registered against its node. Dropping the handle (explicitly via
814/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
815/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
816///
817/// # Lifetime semantics
818///
819/// The subscription holds a [`Weak`] reference back to the Core's state. If
820/// the Core is dropped before the subscription, the Drop impl is a silent
821/// no-op (the sink has nowhere to deregister from anyway). This avoids a
822/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
823///
824/// # Thread safety
825///
826/// `Send + Sync`. The handle can be moved across threads or dropped from
827/// any thread.
828///
829/// # Not Clone
830///
831/// `Subscription` owns the unsubscribe action exclusively. Cloning would
832/// require either "first drop wins" or "last drop wins" semantics, both
833/// of which surprise. If a binding needs multiple deregistration handles,
834/// it should subscribe multiple times (each producing a fresh handle) or
835/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
836#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
837pub struct Subscription {
838 state: Weak<Mutex<CoreState>>,
839 node_id: NodeId,
840 sub_id: SubscriptionId,
841}
842
843impl Subscription {
844 /// The node this subscription is attached to.
845 #[must_use]
846 pub fn node_id(&self) -> NodeId {
847 self.node_id
848 }
849}
850
851impl Drop for Subscription {
852 fn drop(&mut self) {
853 // Silent no-op if Core is gone. This keeps Drop infallible (no panics
854 // from a dropped subscription racing a dropped Core) and avoids
855 // surprising users with errors on shutdown.
856 //
857 // Producer deactivation (Slice D, D031): if removing this sub
858 // empties the subscribers map AND the node is a producer, fire
859 // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
860 // the state lock. The binding then drops its per-node state
861 // (subscriptions to upstream sources, captured closure state),
862 // which transitively unsubs from upstreams via their own
863 // `Subscription::Drop`. Re-entrance into Core from the deactivate
864 // hook is permitted since the lock is released first.
865 let Some(state) = self.state.upgrade() else {
866 return;
867 };
868 // Slice E2 (D056): when the last subscriber drops, fire the
869 // node's OnDeactivation cleanup hook BEFORE producer_deactivate
870 // (cleanup may release handles the producer subscription owns;
871 // reverse order would let producer_deactivate drop subs that user
872 // cleanup expected to be live). Both calls are lock-released per
873 // D045.
874 //
875 // OnDeactivation gating (D068, QA Q3 fix): fires only when the
876 // node has fired its fn at least once AND has a fn (`fn_id`
877 // populated). State nodes have no fn — they cannot register a
878 // cleanup spec via the production fn-return path (R2.4.5), so
879 // firing `cleanup_for` on them is wasted FFI; the binding's
880 // lookup is guaranteed to find no `current_cleanup`. Skipping
881 // here saves the FFI hop and matches the design-doc wording
882 // ("never-fired state nodes" — state-with-initial-value satisfies
883 // `has_fired_once = true` but still has no fn).
884 //
885 // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
886 // node that's ALREADY terminal (terminate fired BEFORE this last
887 // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
888 // + producer_deactivate. Mutually exclusive with `terminate_node`'s
889 // queue-wipe site: terminate-with-empty-subs goes through
890 // `pending_wipes`; terminate-with-live-subs routes here when
891 // those subs eventually drop. Either path fires exactly one
892 // wipe per terminal lifecycle.
893 let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
894 let mut s = state.lock();
895 let Some(rec) = s.nodes.get_mut(&self.node_id) else {
896 return;
897 };
898 rec.subscribers.remove(&self.sub_id);
899 // Slice X4 / D2: bump revision so any pending_notify entry for
900 // this node opened earlier in the wave starts a fresh batch on
901 // the next queue_notify, dropping the now-departed sink from
902 // the snapshot.
903 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
904 let last = rec.subscribers.is_empty();
905 let producer = rec.is_producer();
906 // OnDeactivation gate: must have run a fn at least once
907 // (has_fired_once) AND have a fn registered (fn_id.is_some()).
908 // The fn_id check excludes state nodes whose has_fired_once
909 // tracks initial-value status, not "user fn ran."
910 let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
911 let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
912 // Clone the binding Arc out only if at least one hook will
913 // fire. Cheap (Arc::clone) in the common path; skipped on
914 // non-last-sub or never-fired non-producer nodes.
915 let binding = if last && (producer || user_cleanup || fire_wipe) {
916 Some(s.binding.clone())
917 } else {
918 None
919 };
920 (last, producer, user_cleanup, fire_wipe, binding)
921 };
922 if was_last_sub {
923 if let Some(binding) = binding {
924 if has_user_cleanup {
925 binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
926 }
927 if is_producer {
928 binding.producer_deactivate(self.node_id);
929 }
930 // D069: eager wipe — fires AFTER OnDeactivation so the
931 // user closure observes pre-wipe `store` (matches the
932 // existing "OnDeactivation runs before wipe on terminal
933 // reset" invariant covered by test 10). Idempotent —
934 // `HashMap::remove` on absent key is a no-op, so even
935 // if the wave already drained `pending_wipes` earlier,
936 // this fire is benign.
937 if fire_wipe {
938 binding.wipe_ctx(self.node_id);
939 }
940 }
941 }
942 }
943}
944
945// Compile-time assertion that Subscription is Send + Sync. If a future field
946// breaks this, the build fails here rather than downstream at the binding
947// site.
948const _: fn() = || {
949 fn assert_send_sync<T: Send + Sync>() {}
950 assert_send_sync::<Subscription>();
951};
952
953/// A subscriber callback. `Send + Sync` so the Core can fire it from any
954/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
955/// mutable state in `Mutex<T>` or atomics on the binding side.
956pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
957
958// ---------------------------------------------------------------------------
959// PAUSE/RESUME state — §10.2 of the rust-port session doc
960// ---------------------------------------------------------------------------
961
962/// Per-node pause state.
963///
964/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
965/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
966/// the buffered fields are unreachable in the [`Self::Active`] variant —
967/// the compiler refuses access. Per §10.2 simplification.
968///
969/// # Invariants
970///
971/// - `Active` ⇔ no lockId held.
972/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
973/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
974/// only. Other tiers pass through immediately even while paused.
975/// - `dropped` counts messages that fell out the front of `buffer` due to
976/// the Core-global `pause_buffer_cap`; it is reported on resume so callers
977/// can detect overflow without re-tracking it externally.
978#[derive(Debug)]
979pub(crate) enum PauseState {
980 Active,
981 Paused {
982 /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
983 /// stack-allocated. Replaces `Set<unknown>` from TS.
984 locks: SmallVec<[LockId; 2]>,
985 /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
986 /// Replayed on the final RESUME.
987 buffer: VecDeque<Message>,
988 /// Count of messages dropped from the front when `buffer.len()` would
989 /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
990 /// cycle starts fresh).
991 dropped: u32,
992 /// Wall-clock-monotonic ns when the lock first transitioned this node
993 /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
994 /// synthesis to compute `lock_held_duration_ms` in the diagnostic
995 /// payload (Slice F, A3 — 2026-05-07).
996 started_at_ns: u64,
997 /// True after the first overflow event in this pause cycle has been
998 /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
999 /// Subsequent overflows in the same cycle don't re-emit ERROR
1000 /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
1001 /// final RESUME (next pause cycle starts fresh).
1002 overflow_reported: bool,
1003 /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
1004 /// Set to `true` when an upstream dep delivery arrives while this
1005 /// node is paused with [`PausableMode::Default`]. On final RESUME,
1006 /// if `true`, the node is added back to `pending_fires` so the fn
1007 /// fires once with the consolidated dep state. Always `false` for
1008 /// `ResumeAll` mode (the buffered messages are the consolidation
1009 /// mechanism there). Cleared on final RESUME.
1010 pending_wave: bool,
1011 },
1012}
1013
1014impl PauseState {
1015 pub(crate) fn is_paused(&self) -> bool {
1016 matches!(self, Self::Paused { .. })
1017 }
1018
1019 fn lock_count(&self) -> usize {
1020 match self {
1021 Self::Active => 0,
1022 Self::Paused { locks, .. } => locks.len(),
1023 }
1024 }
1025
1026 fn contains_lock(&self, lock_id: LockId) -> bool {
1027 match self {
1028 Self::Active => false,
1029 Self::Paused { locks, .. } => locks.contains(&lock_id),
1030 }
1031 }
1032
1033 /// Add a lock; transitions Active → Paused on first lock. Idempotent on
1034 /// duplicate lock_id (matches TS convention; spec is silent on the case).
1035 fn add_lock(&mut self, lock_id: LockId) {
1036 match self {
1037 Self::Active => {
1038 let mut locks = SmallVec::new();
1039 locks.push(lock_id);
1040 *self = Self::Paused {
1041 locks,
1042 buffer: VecDeque::new(),
1043 dropped: 0,
1044 started_at_ns: monotonic_ns(),
1045 overflow_reported: false,
1046 pending_wave: false,
1047 };
1048 }
1049 Self::Paused { locks, .. } => {
1050 if !locks.contains(&lock_id) {
1051 locks.push(lock_id);
1052 }
1053 }
1054 }
1055 }
1056
1057 /// Mark that an upstream dep delivered DATA to a node paused with
1058 /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
1059 /// on final RESUME via [`Self::take_pending_wave`].
1060 pub(crate) fn mark_pending_wave(&mut self) {
1061 if let Self::Paused { pending_wave, .. } = self {
1062 *pending_wave = true;
1063 }
1064 }
1065
1066 /// Read and clear the `pending_wave` flag. Called from
1067 /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
1068 /// only if the node was paused with `pending_wave` set.
1069 pub(crate) fn take_pending_wave(&mut self) -> bool {
1070 if let Self::Paused { pending_wave, .. } = self {
1071 std::mem::replace(pending_wave, false)
1072 } else {
1073 false
1074 }
1075 }
1076
1077 /// Remove a lock; if the lockset becomes empty, transition Paused →
1078 /// Active and return the buffered messages for replay (along with the
1079 /// dropped count for diagnostics). Unknown lock_id is an idempotent
1080 /// no-op (matches TS, R1.2.6 implicit).
1081 fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
1082 match self {
1083 Self::Active => None,
1084 Self::Paused { locks, .. } => {
1085 if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
1086 locks.swap_remove(idx);
1087 }
1088 if locks.is_empty() {
1089 let prev = std::mem::replace(self, Self::Active);
1090 if let Self::Paused {
1091 buffer, dropped, ..
1092 } = prev
1093 {
1094 return Some((buffer, dropped));
1095 }
1096 }
1097 None
1098 }
1099 }
1100 }
1101
1102 /// Append a message to the buffer; if the buffer would exceed `cap`,
1103 /// pop from the front (oldest-first), increment `dropped`, and return
1104 /// the dropped messages so the caller can release any payload handles
1105 /// they reference. `cap` of `None` means unbounded.
1106 ///
1107 /// Returns [`PushBufferedResult`] carrying both the dropped messages
1108 /// (for refcount release) and whether this push triggered the FIRST
1109 /// overflow event in the current pause cycle (for R1.3.8.c ERROR
1110 /// synthesis — the caller schedules a single ERROR per cycle).
1111 ///
1112 /// Note: refcount management for the message's payload handle is the
1113 /// caller's responsibility — see [`Core::queue_notify`] for the
1114 /// retain/release discipline. The buffer itself is just a message
1115 /// container; refcounts cross the binding boundary.
1116 pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
1117 let mut result = PushBufferedResult::default();
1118 if let Self::Paused {
1119 buffer,
1120 dropped,
1121 overflow_reported,
1122 ..
1123 } = self
1124 {
1125 buffer.push_back(msg);
1126 if let Some(c) = cap {
1127 while buffer.len() > c {
1128 if let Some(dropped_msg) = buffer.pop_front() {
1129 result.dropped_msgs.push(dropped_msg);
1130 }
1131 *dropped = dropped.saturating_add(1);
1132 }
1133 }
1134 // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
1135 if !result.dropped_msgs.is_empty() && !*overflow_reported {
1136 *overflow_reported = true;
1137 result.first_overflow_this_cycle = true;
1138 }
1139 }
1140 result
1141 }
1142
1143 /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
1144 /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
1145 /// the configured cap (it's a Core-global value, not per-PauseState).
1146 pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1147 match self {
1148 Self::Active => None,
1149 Self::Paused {
1150 dropped,
1151 started_at_ns,
1152 ..
1153 } => {
1154 let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1155 Some((*dropped, lock_held_ns))
1156 }
1157 }
1158 }
1159}
1160
1161/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1162/// messages (for refcount release) and an "is this the first overflow this
1163/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1164#[derive(Default)]
1165pub(crate) struct PushBufferedResult {
1166 pub(crate) dropped_msgs: Vec<Message>,
1167 pub(crate) first_overflow_this_cycle: bool,
1168}
1169
1170/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1171/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1172/// drained at wave-end after the lock-released call to
1173/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1174///
1175/// `configured_max` is captured at scheduling time rather than read at
1176/// drain — the user could change `pause_buffer_cap` between schedule and
1177/// drain, and the diagnostic reads "the cap that was in effect when the
1178/// overflow happened."
1179#[derive(Debug, Clone)]
1180pub(crate) struct PendingPauseOverflow {
1181 pub(crate) node_id: NodeId,
1182 pub(crate) dropped_count: u32,
1183 pub(crate) configured_max: usize,
1184 pub(crate) lock_held_ns: u64,
1185}
1186
1187/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1188#[derive(Error, Debug, Clone, PartialEq)]
1189pub enum PauseError {
1190 #[error("pause/resume: unknown node {0:?}")]
1191 UnknownNode(NodeId),
1192}
1193
1194/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1195#[derive(Error, Debug, Clone, PartialEq)]
1196pub enum UpError {
1197 /// Node id is not registered.
1198 #[error("up: unknown node {0:?}")]
1199 UnknownNode(NodeId),
1200 /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1201 /// downstream-only per R1.4.1; rejected at the boundary.
1202 #[error(
1203 "up: tier {tier} is forbidden upstream — value (tier 3) and \
1204 terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1205 )]
1206 TierForbidden { tier: u8 },
1207}
1208
1209/// Errors returnable by [`Core::register`] and its sugar wrappers
1210/// ([`Core::register_state`], [`Core::register_producer`],
1211/// [`Core::register_derived`], [`Core::register_dynamic`],
1212/// [`Core::register_operator`]).
1213///
1214/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1215/// errors so that callers can recover from contract violations without
1216/// process abort. Every variant corresponds to a construction-time
1217/// invariant that the caller is responsible for upholding; the dispatcher
1218/// rejects the registration before any reactive state is created (so
1219/// there is no `Message::Error` channel through which to surface the
1220/// failure — these are imperative-layer errors, not reactive ones).
1221///
1222/// All variants are zero-side-effect: when [`Core::register`] returns
1223/// `Err`, no node has been added to the graph and any handle retains
1224/// taken on the way in (e.g. operator scratch seed retains via
1225/// [`BindingBoundary::retain_handle`]) have been released.
1226#[derive(Error, Debug, Clone, PartialEq, Eq)]
1227pub enum RegisterError {
1228 /// One of the supplied dep ids is not a registered node.
1229 #[error("register: unknown dep {0:?}")]
1230 UnknownDep(NodeId),
1231
1232 /// `op` was supplied (operator node) but `deps` was empty. Operator
1233 /// nodes need at least one dep — for subscription-managed combinators
1234 /// with no declared deps, use [`Core::register_producer`] instead.
1235 #[error(
1236 "register: operator nodes require at least one dep — \
1237 use register_producer for subscription-managed combinators"
1238 )]
1239 OperatorWithoutDeps,
1240
1241 /// [`NodeOpts::initial`] was set to a real handle but the registration
1242 /// shape is not a state node (state nodes are `deps.is_empty() &&
1243 /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1244 /// for state nodes.
1245 #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1246 InitialOnlyForStateNodes,
1247
1248 /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1249 /// resubscribable. Adding it would create a permanent wedge — the dep
1250 /// will never re-emit, so the registered node would be stuck.
1251 /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1252 #[error(
1253 "register: dep {0:?} is terminal and not resubscribable; \
1254 mark it resubscribable before terminating, or remove it from the dep list"
1255 )]
1256 TerminalDep(NodeId),
1257
1258 /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1259 /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1260 /// requires the seed to be a real handle so that the operator can
1261 /// emit on its first fire.
1262 #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1263 OperatorSeedSentinel,
1264}
1265
1266/// Errors returnable by [`Core::set_pausable_mode`].
1267///
1268/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1269/// errors. Same imperative-layer error model as [`RegisterError`].
1270#[derive(Error, Debug, Clone, PartialEq, Eq)]
1271pub enum SetPausableModeError {
1272 /// `node_id` is not a registered node.
1273 #[error("set_pausable_mode: unknown node {0:?}")]
1274 UnknownNode(NodeId),
1275 /// The node currently holds at least one pause lock. Changing pausable
1276 /// mode mid-pause would lose buffered content or strand a
1277 /// `pending_wave` flag — resume all locks first.
1278 #[error(
1279 "set_pausable_mode: cannot change pausable mode while paused; \
1280 resume all locks first"
1281 )]
1282 WhilePaused,
1283}
1284
1285/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1286/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1287///
1288/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1289/// and cross-wave `prev_data` for `ctx.prevData` access.
1290pub(crate) struct DepRecord {
1291 /// The dep node this record tracks.
1292 pub(crate) node: NodeId,
1293 /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1294 /// means the dep has never emitted DATA.
1295 pub(crate) prev_data: HandleId,
1296 /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1297 pub(crate) dirty: bool,
1298 /// Per-dep involved-this-wave flag. Distinguishes:
1299 /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1300 /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1301 pub(crate) involved_this_wave: bool,
1302 /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1303 /// 1 element. Inside `batch()`, K emits on the source produce K entries
1304 /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1305 /// taken at `deliver_data_to_consumer` time; released at wave-end
1306 /// rotation in `clear_wave_state`.
1307 pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1308 /// Terminal state for this dep. `None` = dep is live.
1309 /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1310 /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1311 pub(crate) terminal: Option<TerminalKind>,
1312}
1313
1314impl DepRecord {
1315 fn new(node: NodeId) -> Self {
1316 Self {
1317 node,
1318 prev_data: NO_HANDLE,
1319 dirty: false,
1320 involved_this_wave: false,
1321 data_batch: SmallVec::new(),
1322 terminal: None,
1323 }
1324 }
1325}
1326
1327/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1328///
1329/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1330/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1331/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1332/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1333/// predicates without unpacking via [`Core::kind_of`].
1334///
1335/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1336/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1337/// an orthogonal concern. `is_dynamic` is constant per node (set at
1338/// register time); the others are mutable lifecycle state. Collapsing
1339/// them into a bitfield would obscure intent.
1340#[allow(clippy::struct_excessive_bools)]
1341pub(crate) struct NodeRecord {
1342 /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1343 /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1344 pub(crate) dep_records: Vec<DepRecord>,
1345 /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1346 /// Producer; `None` for State / Operator. (Operator dispatch goes via
1347 /// [`Self::op`] instead.)
1348 pub(crate) fn_id: Option<FnId>,
1349 /// Operator discriminant for typed-op dispatch. `Some` for Operator
1350 /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1351 /// either closure-form OR typed-op, never both).
1352 pub(crate) op: Option<OperatorOp>,
1353 /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1354 /// indices per fire). False for everything else. Only meaningful when
1355 /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1356 pub(crate) is_dynamic: bool,
1357 pub(crate) equals: EqualsMode,
1358
1359 // Mutable state
1360 pub(crate) cache: HandleId,
1361 pub(crate) has_fired_once: bool,
1362 pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1363 /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1364 /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1365 /// handshake-panic cleanup). Used by
1366 /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1367 /// set changes and start a fresh `PendingBatch` with an updated sink
1368 /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1369 /// and multi-emit-per-wave gap where the pre-fix per-node single
1370 /// snapshot meant a sub installed between two emits to the same node
1371 /// in one wave was invisible to the second emit's flush.
1372 ///
1373 /// Per-node (not per-Core) so that a subscribe to node A doesn't
1374 /// invalidate snapshot reuse for node B's pending batch in the same
1375 /// wave.
1376 pub(crate) subscribers_revision: u64,
1377 /// For dynamic nodes: which dep indices fn actually tracks.
1378 /// For static derived: all indices, populated at construction.
1379 pub(crate) tracked: HashSet<usize>,
1380
1381 // Wave-scoped state — cleared at wave end.
1382 pub(crate) dirty: bool,
1383 pub(crate) involved_this_wave: bool,
1384
1385 /// Per-node pause state. Default `Active`. See [`PauseState`].
1386 pub(crate) pause_state: PauseState,
1387 /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1388 /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1389 /// fn-fire while paused and consolidates N pause-window dep deliveries
1390 /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1391 /// for verbatim replay; `Off` ignores PAUSE entirely. See
1392 /// [`PausableMode`].
1393 pub(crate) pausable: PausableMode,
1394 /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1395 /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1396 /// DATA-handle emissions for late-subscriber replay. Each handle in
1397 /// the buffer owns one binding-side retain share, released on evict
1398 /// (cap exceeded) or in `Drop for CoreState`.
1399 pub(crate) replay_buffer_cap: Option<usize>,
1400 pub(crate) replay_buffer: VecDeque<HandleId>,
1401
1402 /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1403 /// further `emit` calls are silent no-ops, fn no longer fires, and only
1404 /// the terminal message has been queued downstream.
1405 pub(crate) terminal: Option<TerminalKind>,
1406 /// True after the first TEARDOWN has been processed for this node
1407 /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1408 /// — the auto-prepended COMPLETE only fires on the first one. Without
1409 /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1410 /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1411 /// to subscribers per delivery, which is incorrect.
1412 pub(crate) has_received_teardown: bool,
1413 /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1414 /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1415 /// will reset the node: clear `terminal`, `has_fired_once`,
1416 /// `has_received_teardown`, all dep_records to sentinel, and drain the
1417 /// pause lockset. Default `false`.
1418 pub(crate) resubscribable: bool,
1419 /// Meta companion nodes attached to this node per R1.3.9.d. When this
1420 /// node tears down, its meta companions are torn down FIRST (before
1421 /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1422 /// observers see companions terminate before the parent. The ordering
1423 /// is load-bearing — meta nodes typically subscribe to parent state
1424 /// that becomes inconsistent during the parent's destruction phase.
1425 pub(crate) meta_companions: Vec<NodeId>,
1426 /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1427 /// first-run gate — the node fires as soon as ANY dep delivers a
1428 /// real handle, even if other deps remain sentinel. Defaults to
1429 /// `false` (gated). Lifted into Core for operator support; for
1430 /// State/Derived/Dynamic nodes the field is settable but the gated
1431 /// path remains the typical caller default.
1432 pub(crate) partial: bool,
1433 /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1434 /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1435 /// `None` for non-operator kinds and operators with no cross-wave
1436 /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1437 /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1438 /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1439 /// [`TakeWhile`] / [`Last`]).
1440 ///
1441 /// The boxed value implements
1442 /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1443 /// `release_handles` method is called from
1444 /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1445 /// from [`Drop for CoreState`].
1446 ///
1447 /// **Refcount discipline:** the state struct owns whatever handle
1448 /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1449 /// [`LastState::latest`](crate::op_state::LastState::latest)).
1450 /// Per-fire helpers retain the new value before releasing the old;
1451 /// `release_handles` releases the current shares at end-of-life.
1452 pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1453}
1454
1455impl NodeRecord {
1456 // ---- Kind predicates (D030 — derived from field shape) ----
1457
1458 /// True iff this is a state node (no deps, no fn, no op).
1459 pub(crate) fn is_state(&self) -> bool {
1460 self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1461 }
1462
1463 /// True iff this is a producer node (no deps + has fn + no op).
1464 /// Producers fire fn once on first subscribe; cleanup fires via
1465 /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1466 pub(crate) fn is_producer(&self) -> bool {
1467 self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1468 }
1469
1470 /// True iff this is a compute node (Derived / Dynamic / Operator) —
1471 /// has at least one dep AND either a fn or an op.
1472 #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1473 pub(crate) fn is_compute(&self) -> bool {
1474 !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1475 }
1476
1477 /// True iff this is an Operator node (has op set).
1478 #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1479 pub(crate) fn is_operator(&self) -> bool {
1480 self.op.is_some()
1481 }
1482
1483 /// True iff this node opts OUT of Lock 2.B auto-cascade —
1484 /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1485 pub(crate) fn skips_auto_cascade(&self) -> bool {
1486 match self.op {
1487 Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1488 None => false,
1489 }
1490 }
1491
1492 /// Compute the public-API [`NodeKind`] from the field shape (D030).
1493 /// Used by [`Core::kind_of`] and rare internal sites that need the
1494 /// enum (most use the predicate methods above).
1495 pub(crate) fn kind(&self) -> NodeKind {
1496 if let Some(op) = self.op {
1497 NodeKind::Operator(op)
1498 } else if self.dep_records.is_empty() {
1499 if self.fn_id.is_some() {
1500 NodeKind::Producer
1501 } else {
1502 NodeKind::State
1503 }
1504 } else if self.is_dynamic {
1505 NodeKind::Dynamic
1506 } else {
1507 NodeKind::Derived
1508 }
1509 }
1510
1511 // ---- Existing accessors ----
1512
1513 /// Iterator over dep NodeIds in declaration order.
1514 pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1515 self.dep_records.iter().map(|r| r.node)
1516 }
1517
1518 /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1519 pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1520 self.dep_ids().collect()
1521 }
1522
1523 /// Number of deps.
1524 pub(crate) fn dep_count(&self) -> usize {
1525 self.dep_records.len()
1526 }
1527
1528 /// True if any dep is in sentinel state (never emitted DATA and no
1529 /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1530 pub(crate) fn has_sentinel_deps(&self) -> bool {
1531 self.dep_records
1532 .iter()
1533 .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1534 }
1535
1536 /// Find the index of a dep by NodeId.
1537 pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1538 self.dep_records.iter().position(|r| r.node == dep_id)
1539 }
1540
1541 /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1542 pub(crate) fn all_deps_terminal(&self) -> bool {
1543 !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1544 }
1545}
1546
1547// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1548//
1549// The four wave-scoped fields previously held under
1550// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1551// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1552// [`crate::batch`]. The bench-driven rationale is documented at
1553// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1554// `cross_partition` mutex was the dominant cost in Phase J's regression,
1555// not single-thread mutex hop count. Per-thread placement eliminates the
1556// bounce point entirely.
1557//
1558// **Refcount discipline preserved.** `wave_cache_snapshots` and
1559// `deferred_handle_releases` still hold binding-side handle retains;
1560// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1561// success and panic paths (the binding ref the prior
1562// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1563// `BatchGuard` already holds a `Core` clone with the binding).
1564
1565/// All mutable Core state, behind one [`parking_lot::Mutex`].
1566///
1567/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1568/// cross-partition aggregation fields out into a separate
1569/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1570/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1571/// entirely — its four fields moved to per-thread `WaveState`
1572/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1573/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1574/// `in_tick` and `currently_firing` BACK to CoreState (per-Core,
1575/// cross-thread visible — load-bearing for cross-Core same-thread
1576/// isolation and cross-thread P13 set_deps check respectively).
1577///
1578/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1579/// set to a per-thread thread-local in `crate::batch` (was briefly
1580/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1581/// cross-thread `set_deps` partition splits — see
1582/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1583/// closing summary). Q-beyond
1584/// will continue the shape decomposition by sharding most of the
1585/// remaining fields per-partition.
1586pub(crate) struct CoreState {
1587 pub(crate) next_node_id: u64,
1588 pub(crate) next_subscription_id: u64,
1589 pub(crate) next_lock_id: u64,
1590 pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1591 /// Inverted adjacency: `parent → children`. Updated on registration.
1592 pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1593 // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
1594 // `pending_notify` moved to per-thread
1595 // [`crate::batch::WaveState`]. Both fields are wave-scoped — emit
1596 // populates them on the same thread that drains them at wave end.
1597 // Cross-thread emits block on partition `wave_owner` and land in
1598 // the OTHER thread's wave context, so the per-thread placement is
1599 // safe by construction.
1600 //
1601 // Q-beyond Sub-slice 3 (D108, 2026-05-09): `in_tick`,
1602 // `currently_firing`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
1603 // `pending_wipes`, and `invalidate_hooks_fired_this_wave` likewise
1604 // moved to [`crate::batch::WaveState`]. Same wave-scoped per-thread
1605 // rationale.
1606 /// Core-global cap on per-node pause replay buffer length. `None` means
1607 /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1608 /// per-node override can be added later as a pure addition without API
1609 /// breakage. Default `None`.
1610 pub(crate) pause_buffer_cap: Option<usize>,
1611 /// Core-global cap on wave-drain iterations before
1612 /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1613 /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1614 /// (R4.3 / Lock 2.F′). Default `10_000`.
1615 ///
1616 /// The drain loop bound exists to surface runtime cycles
1617 /// (e.g. an operator that re-arms its own `pending_fires` slot during
1618 /// `invoke_fn`) as a panic with context, rather than letting Core
1619 /// spin forever. Structural cycles via [`Core::set_deps`] are
1620 /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1621 /// registration is structurally cycle-safe by construction (the new
1622 /// node's id is not allocated until AFTER deps are validated, so deps
1623 /// cannot transitively reach the new node). The drain bound is the
1624 /// safety net for runtime cycles that bypass both static checks.
1625 pub(crate) max_batch_drain_iterations: u32,
1626 /// Wave-active flag — true between outermost `BatchGuard` entry and
1627 /// the matching outermost drop. Per-Core (cross-thread visible) so
1628 /// cross-Core same-thread BatchGuard nesting is correctly isolated:
1629 /// Core-A's `in_tick=true` does NOT cause Core-B's `begin_batch` to
1630 /// see itself as nested.
1631 ///
1632 /// /qa F1 reverted (2026-05-10): briefly placed on per-thread
1633 /// `WaveState` in Sub-slice 3, then moved BACK to `CoreState` after
1634 /// /qa F1 surfaced the cross-Core same-thread isolation regression
1635 /// (a thread holding a live BatchGuard on Core-A and starting a wave
1636 /// on Core-B would observe `ws.in_tick=true` set by Core-A → Core-B
1637 /// becomes non-owning → Core-B's writes drained by Core-A's binding
1638 /// → silent corruption). The other 11 wave-scoped fields stay on
1639 /// per-thread `WaveState` because they're accessed only by the
1640 /// wave-owner thread under `wave_owner` discipline.
1641 pub(crate) in_tick: bool,
1642 /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1643 /// NodeIds whose fn is currently being invoked. Pushed at the top of
1644 /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1645 /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1646 /// RAII helper. [`Core::set_deps`] consults this set and rejects
1647 /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1648 /// firing — preventing the D1 `tracked` index corruption. Read by
1649 /// the P13 partition-migration check (D091) to reject mid-fire
1650 /// `set_deps` that would migrate a firing node's partition.
1651 ///
1652 /// **Per-Core (cross-thread visible).** /qa F2 reverted (2026-05-10):
1653 /// briefly placed on per-thread `WaveState` in Sub-slice 3, then
1654 /// moved BACK to `CoreState` after /qa F2 surfaced the cross-thread
1655 /// P13 bypass (per-thread placement made Thread B's `set_deps` read
1656 /// its own empty stack → P13 silently bypassed for cross-thread
1657 /// `set_deps` calls during Thread A's lock-released `invoke_fn`).
1658 /// Cross-thread visibility on shared `CoreState` is the load-bearing
1659 /// property the D091 check requires.
1660 ///
1661 /// Membership semantics (NOT strict LIFO): consumed via
1662 /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1663 /// right-most matching `node_id` via `rposition` + `swap_remove`;
1664 /// physical order of remaining entries may not match construction
1665 /// order, but membership is preserved.
1666 pub(crate) currently_firing: Vec<NodeId>,
1667 // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
1668 // moved to [`crate::batch::WaveState`].
1669 /// Binding-boundary handle for `Drop`-time refcount balancing.
1670 /// `Core` also holds a clone of this Arc; storing it here lets
1671 /// `Drop for CoreState` walk every retained slot and release the
1672 /// binding-side share when the last `Core` clone drops. Without this,
1673 /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1674 /// handle refs leak in the binding registry until process exit.
1675 pub(crate) binding: Arc<dyn BindingBoundary>,
1676 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `wave_cache_snapshots`
1677 // moved to [`crate::batch::WaveState::wave_cache_snapshots`].
1678 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
1679 // moved to [`crate::batch::WaveState::pending_auto_resolve`].
1680 /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1681 pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1682 pub(crate) next_topology_id: u64,
1683 // /qa F2 reverted (2026-05-10): `currently_firing` field is declared
1684 // EARLIER in this struct (above `pause_buffer_cap`). Sub-slice 3
1685 // briefly moved it to `crate::batch::WaveState::currently_firing` on
1686 // the per-thread thread_local, but per-thread placement silently
1687 // bypassed the cross-thread P13 partition-migration check. /qa F2
1688 // (2026-05-10) moved it BACK to CoreState (cross-thread visible).
1689 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_pause_overflow`
1690 // moved to [`crate::batch::WaveState::pending_pause_overflow`].
1691 // Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): tier3-emitted-this-wave
1692 // tracker MOVED to a per-thread thread-local in `crate::batch`
1693 // (D1 patch, 2026-05-09 — was briefly per-partition under Q3 v1
1694 // 2026-05-09 morning). Wave-scope = thread; per-thread placement
1695 // is robust to mid-wave cross-thread `set_deps` partition splits
1696 // because thread B's split doesn't touch thread A's thread-local.
1697 // See [`crate::batch::TIER3_EMITTED_THIS_WAVE`] for the per-thread
1698 // wave-scope rationale and lifecycle (cleared at outermost
1699 // [`crate::batch::BatchGuard`] drop, both success + panic).
1700 //
1701 // Q-beyond Sub-slice 3 (D108, 2026-05-09):
1702 // `invalidate_hooks_fired_this_wave`, `deferred_cleanup_hooks`,
1703 // and `pending_wipes` moved to [`crate::batch::WaveState`].
1704 // Wave-scoped per-thread; same rationale as the other Sub-slice 3
1705 // migrations.
1706}
1707
1708/// The handle-protocol Core dispatcher.
1709///
1710/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1711/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1712/// value to threads.
1713///
1714/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1715///
1716/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1717/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1718/// To preserve serializability of WAVE EXECUTION across threads — without
1719/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1720/// refactor lifted — the wave engine acquires `wave_owner` (a
1721/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1722///
1723/// Properties:
1724///
1725/// - **Same-thread re-entrance is free.** A user fn that calls back into
1726/// `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1727/// on the same thread and runs as a nested wave (the inner `run_wave`
1728/// sees `in_tick=true` and skips drain — outer drain picks up).
1729/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1730/// in-flight wave completes (drain + flush + sink fire all done). This
1731/// serializes wave OWNERSHIP across threads, while still allowing the
1732/// state lock to drop inside the wave for binding callbacks.
1733///
1734/// Without this, Slice A close's lock-released drain let cross-thread
1735/// emits absorb into the in-flight wave's `pending_notify` and return
1736/// before subscribers fire — breaking the user-facing happens-after
1737/// contract that `emit` returning means subscribers have observed.
1738#[derive(Clone)]
1739pub struct Core {
1740 pub(crate) state: Arc<Mutex<CoreState>>,
1741 pub(crate) binding: Arc<dyn BindingBoundary>,
1742 /// Slice X5 (D3 substrate, 2026-05-08) + Slice Y1 / Phase E
1743 /// (wave-engine migration, 2026-05-08): per-subgraph union-find
1744 /// registry. Tracks each registered node's connected-component
1745 /// membership (a "subgraph"). Each component's root carries an
1746 /// `Arc<SubgraphLockBox>` whose `wave_owner: ReentrantMutex<()>`
1747 /// is the per-partition wave-serialization lock — acquired by
1748 /// [`Self::partition_wave_owner_lock_arc`] under the retry-validate
1749 /// loop. Cross-thread emits to disjoint partitions run truly
1750 /// parallel; same-thread re-entry passes through reentrantly.
1751 ///
1752 /// Direct port of [`graphrefly-py`'s
1753 /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
1754 /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
1755 pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1756}
1757
1758/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1759///
1760/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1761/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1762/// closures (notably `ProducerBuildFn`s registered via
1763/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1764/// to break the BenchBinding → registry → closure → strong-Core cycle
1765/// that would otherwise leak the entire graph state when a `BenchCore`
1766/// drops with active producer registrations.
1767///
1768/// Upgrade on each invocation; if the host `Core` was already dropped,
1769/// `upgrade()` returns `None` and the closure should no-op (the host
1770/// is being torn down, no work to do).
1771#[derive(Clone)]
1772pub struct WeakCore {
1773 state: Weak<Mutex<CoreState>>,
1774 binding: Weak<dyn BindingBoundary>,
1775 registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1776}
1777
1778impl WeakCore {
1779 /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1780 /// host `Core`'s strong count has reached zero (i.e. the host
1781 /// `BenchCore` / equivalent owner was dropped).
1782 #[must_use]
1783 pub fn upgrade(&self) -> Option<Core> {
1784 Some(Core {
1785 state: self.state.upgrade()?,
1786 binding: self.binding.upgrade()?,
1787 registry: self.registry.upgrade()?,
1788 })
1789 }
1790}
1791
1792/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1793/// caller `take()`s it for installation, or (b) the guard drops on an
1794/// early return / unwind, in which case the scratch's handle retains
1795/// are released via [`OperatorScratch::release_handles`].
1796///
1797/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1798/// gaps in `Core::register`:
1799///
1800/// 1. **TOCTOU window** — the original three-phase split called
1801/// `lock_state()` twice (once for validation, once for insertion),
1802/// so a concurrent `Core::complete(dep)` on a non-resubscribable
1803/// dep could slip in between the two acquisitions and re-create
1804/// the wedge `RegisterError::TerminalDep` was designed to prevent.
1805/// The guard plus a single locked region for both phases closes
1806/// this gap (release runs lock-released because guard variables
1807/// drop in reverse declaration order — guard declared BEFORE
1808/// `lock_state()`, so the lock guard drops first).
1809///
1810/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1811/// between `make_op_scratch` (Phase 2) and the explicit
1812/// `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1813/// assert, OOM-as-panic on Vec growth in dep iteration) would
1814/// drop the `Box<dyn OperatorScratch>` without releasing the
1815/// seed/default retain. The guard's `Drop` impl releases on any
1816/// unwind path.
1817///
1818/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1819/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1820/// invokes `release_handles` lock-released — fires AFTER any
1821/// `MutexGuard<CoreState>` declared later in the same scope drops
1822/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1823/// pattern.
1824struct ScratchReleaseGuard<'a> {
1825 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1826 binding: &'a dyn BindingBoundary,
1827}
1828
1829impl<'a> ScratchReleaseGuard<'a> {
1830 fn new(
1831 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1832 binding: &'a dyn BindingBoundary,
1833 ) -> Self {
1834 Self { scratch, binding }
1835 }
1836
1837 /// Take ownership of the scratch — disarms the release-on-drop
1838 /// behavior. Used on the success path to install the scratch on
1839 /// `NodeRecord.op_scratch`.
1840 fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1841 self.scratch.take()
1842 }
1843}
1844
1845impl Drop for ScratchReleaseGuard<'_> {
1846 fn drop(&mut self) {
1847 if let Some(mut scratch) = self.scratch.take() {
1848 scratch.release_handles(self.binding);
1849 }
1850 }
1851}
1852
1853impl Core {
1854 /// Construct a fresh Core wired to the given binding. Pause buffer cap
1855 /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1856 #[must_use]
1857 pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1858 Self {
1859 state: Arc::new(Mutex::new(CoreState {
1860 next_node_id: 1,
1861 next_subscription_id: 1,
1862 // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
1863 // half of the u32 range so `alloc_lock_id` can't collide with
1864 // user-supplied `LockId::new(N)` constructors (which the
1865 // napi-rs binding marshals from `u32` and tests typically use
1866 // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
1867 // lowered from `1u64 << 32` to `1u64 << 31` so the value
1868 // round-trips through `u32::try_from(...)` at the napi
1869 // boundary — the previous seed errored every napi
1870 // `alloc_lock_id` call. Anti-collision intent (high range vs
1871 // low user range) preserved at half the prior ceiling
1872 // (2^31 ≈ 2 billion allocations per Core, ample for parity
1873 // tests). Lift the floor when the deferred BigInt-narrowing
1874 // migration extends `LockId` to `u64` at the FFI layer
1875 // (porting-deferred "BigInt migration for u32-narrowed napi
1876 // types" entry).
1877 next_lock_id: 1u64 << 31,
1878 nodes: HashMap::new(),
1879 children: HashMap::new(),
1880 // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires,
1881 // pending_notify, deferred_flush_jobs,
1882 // deferred_cleanup_hooks, pending_wipes, and
1883 // invalidate_hooks_fired_this_wave all live on per-thread
1884 // [`crate::batch::WaveState`].
1885 // /qa F1+F2 reverted (2026-05-10): in_tick + currently_firing
1886 // stay on CoreState (per-Core, cross-thread visible) for
1887 // cross-Core same-thread isolation (F1) and cross-thread
1888 // P13 set_deps check (F2).
1889 in_tick: false,
1890 currently_firing: Vec::new(),
1891 pause_buffer_cap: None,
1892 max_batch_drain_iterations: 10_000,
1893 binding: binding.clone(),
1894 topology_sinks: HashMap::new(),
1895 next_topology_id: 1,
1896 })),
1897 binding,
1898 registry: Arc::new(parking_lot::Mutex::new(
1899 crate::subgraph::SubgraphRegistry::new(),
1900 )),
1901 }
1902 }
1903
1904 /// Acquire the state lock.
1905 ///
1906 /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
1907 /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
1908 /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
1909 /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
1910 /// transparently; cross-thread emits block on `wave_owner` until the
1911 /// outer subscribe completes, preserving R1.3.5.a happens-after
1912 /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
1913 /// longer needed.
1914 pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
1915 self.state.lock()
1916 }
1917
1918 /// Whether `self` and `other` point to the same dispatcher state.
1919 /// True when one was produced by `Clone`-ing the other (or they
1920 /// were both cloned from a common ancestor); false for two
1921 /// independently `Core::new`-constructed instances even with the
1922 /// same binding.
1923 ///
1924 /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
1925 /// only" v1 invariant — cross-Core mount is post-M6.
1926 #[must_use]
1927 pub fn same_dispatcher(&self, other: &Core) -> bool {
1928 Arc::ptr_eq(&self.state, &other.state)
1929 }
1930
1931 /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
1932 /// strong refcount of the underlying state / binding / wave_owner.
1933 ///
1934 /// Used by binding-stored long-lived closures (e.g.
1935 /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
1936 /// Arc cycle:
1937 ///
1938 /// ```text
1939 /// BenchBinding → registry → producer_builds[fn_id]
1940 /// → closure → strong Arc<dyn _Binding> → BenchBinding
1941 /// ```
1942 ///
1943 /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
1944 /// upgrade-on-fire (returning early if either weak is dangling —
1945 /// indicating the host BenchCore was already dropped). Upgraded
1946 /// strong refs live only for the build closure's invocation; sinks
1947 /// the build closure spawns close over those upgraded strongs and
1948 /// stay alive only while the producer is active (cleared via
1949 /// `producer_deactivate` on last-subscriber unsubscribe).
1950 #[must_use]
1951 pub fn weak_handle(&self) -> WeakCore {
1952 WeakCore {
1953 state: Arc::downgrade(&self.state),
1954 binding: Arc::downgrade(&self.binding),
1955 registry: Arc::downgrade(&self.registry),
1956 }
1957 }
1958
1959 /// Number of distinct connected-component partitions tracked by
1960 /// the per-subgraph union-find registry (Slice X5 substrate).
1961 /// Two threads emitting into nodes with distinct partitions will
1962 /// run truly parallel once Y1 wires the wave engine through the
1963 /// registry; X5 reports the partition count for inspection
1964 /// (acceptance bar + debugging) but the wave engine still uses
1965 /// the legacy Core-level `wave_owner`.
1966 #[must_use]
1967 pub fn partition_count(&self) -> usize {
1968 self.registry.lock().component_count()
1969 }
1970
1971 /// Resolve `node`'s partition identity per the per-subgraph
1972 /// union-find registry (Slice X5 substrate). Two nodes with the
1973 /// same `SubgraphId` are connected via dep edges (transitively)
1974 /// and share a partition lock under Y1+; nodes in different
1975 /// partitions can run truly parallel.
1976 ///
1977 /// Returns `None` for unregistered nodes.
1978 #[must_use]
1979 pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
1980 self.registry.lock().partition_of(node)
1981 }
1982
1983 // Q3 (2026-05-09) introduced `Core::partition_box_of(node)` to
1984 // resolve a partition's `Arc<SubgraphLockBox>` for per-partition
1985 // state access. The D1 patch (2026-05-09) moved Slice G's
1986 // `tier3_emitted_this_wave` set off `SubgraphLockBox::state` to a
1987 // per-thread thread-local in `crate::batch`, eliminating
1988 // `partition_box_of`'s only callers (`commit_emission` /
1989 // `commit_emission_verbatim`). The helper is REMOVED rather than
1990 // kept dead — Q-beyond will resurrect a similar shape when the
1991 // CoreState shard layout actually needs per-partition lookups.
1992
1993 /// Acquire `seed`'s partition `wave_owner` re-entrant mutex with
1994 /// retry-validate against concurrent union/split. Mirrors
1995 /// graphrefly-py's `subgraph_locks.py::lock_for` retry pattern
1996 /// (lines 154–178): a concurrent `union_nodes` may redirect
1997 /// `seed`'s partition root between our `lock_for` resolve and
1998 /// our `lock_arc` call; if so, the held guard is on a stale
1999 /// (but still valid) `SubgraphLockBox` whose `Arc` no longer
2000 /// matches the registry's canonical box for `seed`'s current
2001 /// root. Release + retry up to [`crate::subgraph::MAX_LOCK_RETRIES`].
2002 ///
2003 /// Returns the held guard. Caller holds it for the wave's
2004 /// duration; drop releases.
2005 ///
2006 /// **Panics** if `seed` is not registered (caller violation —
2007 /// every wave entry takes a `NodeId` already in `s.nodes`, and
2008 /// the P12-fixed lock-discipline guarantees registry membership
2009 /// is published atomically with state). **Panics** on exceeding
2010 /// `MAX_LOCK_RETRIES` — pathological union activity.
2011 ///
2012 /// Slice Y1 / Phase E (2026-05-08).
2013 pub(crate) fn partition_wave_owner_lock_arc(&self, seed: NodeId) -> WaveOwnerGuard {
2014 /// Scope-guard for the H+ thread-local refcount entry. Released on
2015 /// Drop unless `into_consumed()` is called (the success path).
2016 /// Ensures balance even on panic between `check_and_acquire` and
2017 /// successful `WaveOwnerGuard` construction (`lock_arc()` /
2018 /// `lock_for_validate()` could in principle panic; defensive).
2019 struct AcquireGuard {
2020 sid: crate::subgraph::SubgraphId,
2021 consumed: bool,
2022 }
2023 impl AcquireGuard {
2024 fn into_consumed(mut self) {
2025 self.consumed = true;
2026 }
2027 }
2028 impl Drop for AcquireGuard {
2029 fn drop(&mut self) {
2030 if !self.consumed {
2031 held_partitions::release(self.sid);
2032 }
2033 }
2034 }
2035
2036 for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2037 let (sid, lock_box) = {
2038 let mut reg = self.registry.lock();
2039 reg.lock_for(seed).expect(
2040 "partition_wave_owner_lock_arc: seed must be registered \
2041 (P12-fix invariant: registry membership is published \
2042 atomically with `s.nodes`)",
2043 )
2044 };
2045 // Phase H+ option (d) /qa N1(a) widened variant: BEFORE
2046 // acquiring the parking_lot lock, check ascending-order if
2047 // this thread already holds at least one partition AND we're
2048 // not in a producer build closure. Panics on violation.
2049 // Also increments the thread-local refcount for `sid`. The
2050 // `AcquireGuard` ensures the refcount is released on EVERY
2051 // exit path — successful return (via `into_consumed()`),
2052 // retry-validate failure (Drop fires), retry-exhaustion panic
2053 // (Drop fires before unwind), or a panic in `lock_arc()` /
2054 // `lock_for_validate()` (Drop fires during unwind).
2055 held_partitions::check_and_acquire(sid);
2056 let acquire_guard = AcquireGuard {
2057 sid,
2058 consumed: false,
2059 };
2060 let inner = lock_box.wave_owner.lock_arc();
2061 // Re-validate post-acquire. If a concurrent `union` redirected
2062 // `seed`'s root between our `lock_for` and `lock_arc`, the
2063 // registry's current box for `seed` differs from what we hold.
2064 let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
2065 if still_valid {
2066 acquire_guard.into_consumed();
2067 // `lock_box` is unused after this point — the D1 patch
2068 // moved Slice G tier3 tracking off the per-partition
2069 // `SubgraphLockBox::state` to a per-thread thread-local,
2070 // so the guard no longer carries the box reference.
2071 drop(lock_box);
2072 return WaveOwnerGuard { sid, inner };
2073 }
2074 // Stale — drop the parking_lot guard. The AcquireGuard's
2075 // Drop releases the held_partitions refcount automatically.
2076 // Yield to give the contending writer a chance to make
2077 // forward progress before re-resolving (QA-fix group 2 —
2078 // earlier tight-spin could monopolize a CPU under sustained
2079 // pathological union/split activity).
2080 drop(inner);
2081 drop(acquire_guard);
2082 std::thread::yield_now();
2083 }
2084 panic!(
2085 "partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
2086 — pathological concurrent union/split activity. Mirrors py \
2087 `_MAX_LOCK_RETRIES`.",
2088 crate::subgraph::MAX_LOCK_RETRIES,
2089 seed
2090 );
2091 }
2092
2093 /// BFS from `seed` along `s.children` (downstream consumer cascade
2094 /// for DATA / RESOLVED / INVALIDATE / COMPLETE / ERROR / TEARDOWN)
2095 /// and `meta_companions` (R1.3.9.d TEARDOWN cascade). Collects
2096 /// every partition reachable from `seed`, returning the unique
2097 /// `SubgraphId`s sorted ascending — the canonical lock-acquisition
2098 /// order per session-doc Q7 / decision D092 that guarantees
2099 /// deadlock-freedom across cross-partition waves.
2100 ///
2101 /// Holds the state lock + registry lock for the BFS duration
2102 /// (lock order `state → registry` per the P12-fix invariant).
2103 /// Bounded by the cascade graph reachable from `seed`; for typical
2104 /// apps the partition count is small (1–3) and the BFS is
2105 /// negligible relative to wave drain.
2106 ///
2107 /// Used by [`Core::begin_batch_for`] to compute the upfront-
2108 /// acquired partition set for per-seed waves. Closure-form
2109 /// [`Core::batch`] doesn't have a seed and uses
2110 /// [`Core::all_partitions_lock_boxes`] instead.
2111 ///
2112 /// Slice Y1 / Phase E (2026-05-08).
2113 pub(crate) fn compute_touched_partitions(
2114 &self,
2115 seed: NodeId,
2116 ) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
2117 let s = self.lock_state();
2118 let mut reg = self.registry.lock();
2119 let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
2120 let mut visited: HashSet<NodeId> = HashSet::default();
2121 let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
2122 stack.push(seed);
2123 while let Some(n) = stack.pop() {
2124 if !visited.insert(n) {
2125 continue;
2126 }
2127 if let Some(p) = reg.partition_of(n) {
2128 if !partitions.contains(&p) {
2129 partitions.push(p);
2130 }
2131 }
2132 if let Some(children) = s.children.get(&n) {
2133 stack.extend(children.iter().copied());
2134 }
2135 if let Some(rec) = s.nodes.get(&n) {
2136 stack.extend(rec.meta_companions.iter().copied());
2137 }
2138 }
2139 partitions.sort_unstable_by_key(|sid| sid.raw());
2140 partitions
2141 }
2142
2143 /// Snapshot of every currently-existing partition's lock box, in
2144 /// ascending [`crate::subgraph::SubgraphId`] order (canonical
2145 /// lock-acquisition order per session-doc Q7 / D092). Used by
2146 /// closure-form [`Core::batch`] / [`Core::begin_batch`] which
2147 /// don't have a known seed and must serialize against every
2148 /// existing partition.
2149 ///
2150 /// Slice Y1 / Phase E (2026-05-08).
2151 pub(crate) fn all_partitions_lock_boxes(
2152 &self,
2153 ) -> Vec<(
2154 crate::subgraph::SubgraphId,
2155 Arc<crate::subgraph::SubgraphLockBox>,
2156 )> {
2157 self.registry.lock().all_partitions()
2158 }
2159}
2160
2161/// Walk the undirected dep-edge graph from `start`, optionally
2162/// skipping ONE edge in both directions, and optionally treating
2163/// additional edges as if present. Returns every reachable
2164/// [`NodeId`].
2165///
2166/// Implementation note: uses a stack (`pop()` on a `SmallVec`) — i.e.
2167/// DFS traversal order. For pure reachability the order doesn't
2168/// matter (the visited set is identical to BFS); the function is
2169/// named "walk" rather than "BFS" to avoid implying that traversal
2170/// distance is meaningful (QA-fix group 2 — earlier name
2171/// `bfs_undirected_dep_graph` was misleading).
2172///
2173/// **Edge convention:** the dep edge `parent → child` represents
2174/// data flow from `parent` (a dep) to `child` (the consumer). It
2175/// appears in `s.children[parent]` as `child`, and in
2176/// `s.nodes[child].dep_records` as `parent`. `skip_edge =
2177/// Some((parent, child))` skips both forward (`parent → child`) and
2178/// backward (`child → parent`) traversals of that edge. Each
2179/// `(p, c)` pair in `extra_edges` is treated as if `c ∈
2180/// s.children[p]` and `p ∈ s.nodes[c].dep_records` — used for
2181/// "what would connectivity look like if THESE edges were also
2182/// present?" lookahead.
2183///
2184/// Used by Slice Y1 / Phase F (D3 split-eager, 2026-05-09):
2185/// - **P13 widening (pre-removal connectivity):** call with
2186/// `skip_edge = Some((removed_parent, removed_child))` AND
2187/// `extra_edges = added_edges_in_set_deps_call` so a `set_deps`
2188/// that simultaneously removes one edge AND adds another path
2189/// isn't falsely flagged as disconnecting (QA-fix #4 2026-05-09 —
2190/// without `extra_edges`, the pre-mutation BFS doesn't see the
2191/// would-be-added edges and rejects the conservative case).
2192/// - **Actual split execution (post-removal):** call with
2193/// `skip_edge = None` and `extra_edges = &[]`; the visited set is
2194/// the keep-side of the split (the side containing `start`).
2195pub(crate) fn walk_undirected_dep_graph(
2196 s: &CoreState,
2197 start: NodeId,
2198 skip_edge: Option<(NodeId, NodeId)>,
2199 extra_edges: &[(NodeId, NodeId)],
2200) -> HashSet<NodeId> {
2201 let mut visited: HashSet<NodeId> = HashSet::default();
2202 let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
2203 queue.push(start);
2204 while let Some(cur) = queue.pop() {
2205 if !visited.insert(cur) {
2206 continue;
2207 }
2208 if let Some(consumers) = s.children.get(&cur) {
2209 for &c in consumers {
2210 let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
2211 if !is_skipped && !visited.contains(&c) {
2212 queue.push(c);
2213 }
2214 }
2215 }
2216 if let Some(rec) = s.nodes.get(&cur) {
2217 for d in rec.dep_records.iter().map(|r| r.node) {
2218 let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
2219 if !is_skipped && !visited.contains(&d) {
2220 queue.push(d);
2221 }
2222 }
2223 }
2224 // Virtual extra edges (e.g. would-be-added edges in
2225 // pre-mutation BFS).
2226 for &(ep, ec) in extra_edges {
2227 if cur == ep && !visited.contains(&ec) {
2228 queue.push(ec);
2229 }
2230 if cur == ec && !visited.contains(&ep) {
2231 queue.push(ep);
2232 }
2233 }
2234 }
2235 visited
2236}
2237
2238impl Core {
2239 /// Test-only inspection: number of `PendingBatch`es queued for
2240 /// `node` in the current wave. Used by Slice X4 D2 regression
2241 /// tests to pin the "common case = single batch, no SmallVec
2242 /// spill" perf invariant.
2243 ///
2244 /// Returns `None` if no `pending_notify` entry exists for `node`
2245 /// (no tier-1+ message has been queued for this node yet in this
2246 /// wave). `Some(0)` is unreachable by construction (a vacant
2247 /// entry implies no batches; an occupied entry has at least one).
2248 #[cfg(any(test, debug_assertions))]
2249 #[must_use]
2250 pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2251 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2252 // on per-thread `WaveState`. Test callers run on the same
2253 // thread that ran the wave, so the per-thread placement is
2254 // observable here.
2255 crate::batch::with_wave_state(|ws| {
2256 ws.pending_notify
2257 .get(&node)
2258 .map(|entry| entry.batches.len())
2259 })
2260 }
2261
2262 /// Configure the Core-global cap on pause replay buffer length. When set,
2263 /// any per-node pause buffer that would exceed `cap` drops the oldest
2264 /// message(s) from the front; the dropped count is reported back via the
2265 /// resume callback (see [`ResumeReport`]). `None` (default) means
2266 /// unbounded; messages buffer indefinitely until the lockset clears.
2267 pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2268 self.lock_state().pause_buffer_cap = cap;
2269 }
2270
2271 /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2272 /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2273 /// the last `N` DATA emissions in a circular buffer; late subscribers
2274 /// receive them as part of the per-tier handshake (between START and
2275 /// any terminal). Switching from a larger cap to a smaller cap evicts
2276 /// the front of the buffer to fit; switching to `None` drains the
2277 /// buffer entirely. Each evicted/drained handle's retain is released
2278 /// back to the binding.
2279 ///
2280 /// # Panics
2281 ///
2282 /// Panics if `node_id` is not registered.
2283 pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2284 // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2285 // express "disabled" is confusing: `push_replay_buffer` already
2286 // treats `Some(0)` as no-op, so persisting it adds nothing.
2287 let cap = match cap {
2288 Some(0) => None,
2289 other => other,
2290 };
2291 let to_release: Vec<HandleId> = {
2292 let mut s = self.lock_state();
2293 let rec = s.require_node_mut(node_id);
2294 rec.replay_buffer_cap = cap;
2295 match cap {
2296 None => rec.replay_buffer.drain(..).collect(),
2297 Some(c) => {
2298 let mut drained = Vec::new();
2299 while rec.replay_buffer.len() > c {
2300 if let Some(h) = rec.replay_buffer.pop_front() {
2301 drained.push(h);
2302 }
2303 }
2304 drained
2305 }
2306 }
2307 };
2308 for h in to_release {
2309 self.binding.release_handle(h);
2310 }
2311 }
2312
2313 /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2314 /// audit close, 2026-05-07). Default for new nodes is
2315 /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2316 /// for nodes whose pause-window emit history must be observable
2317 /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2318 /// pause-immune.
2319 ///
2320 /// # Errors
2321 ///
2322 /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2323 /// registered.
2324 /// - [`SetPausableModeError::WhilePaused`] — the node currently
2325 /// holds at least one pause lock. Changing mode mid-pause would
2326 /// lose buffered content or strand a `pending_wave` flag — resume
2327 /// all locks first.
2328 pub fn set_pausable_mode(
2329 &self,
2330 node_id: NodeId,
2331 mode: PausableMode,
2332 ) -> Result<(), SetPausableModeError> {
2333 let mut s = self.lock_state();
2334 let rec = s
2335 .nodes
2336 .get_mut(&node_id)
2337 .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2338 if rec.pause_state.is_paused() {
2339 return Err(SetPausableModeError::WhilePaused);
2340 }
2341 rec.pausable = mode;
2342 Ok(())
2343 }
2344
2345 /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2346 /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2347 /// Default is `10_000` — high enough to avoid false positives on legitimate
2348 /// fan-in cascades, low enough to surface runtime cycles within seconds.
2349 ///
2350 /// Lower this only when running adversarial / property-based tests that
2351 /// want fast cycle detection. Raise it only with concrete evidence that a
2352 /// legitimate workload needs more iterations than the default — and even
2353 /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2354 /// raising the cap.
2355 ///
2356 /// # Panics
2357 ///
2358 /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2359 /// first iteration, deadlocking any subsequent dispatcher work.
2360 pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2361 assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2362 self.lock_state().max_batch_drain_iterations = cap;
2363 }
2364
2365 /// Send a message UPSTREAM from `node_id` to each of its declared deps
2366 /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2367 ///
2368 /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2369 /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2370 /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2371 /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2372 ///
2373 /// # Routing per tier
2374 ///
2375 /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2376 /// handshake, not a routable wire signal — sending it upstream has no
2377 /// well-defined target.
2378 /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2379 /// changed" notification is its own [`Self::emit`] / commit
2380 /// responsibility; ignoring upstream DIRTY hints is safe.
2381 /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2382 /// to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2383 /// forwarded verbatim. Errors from individual deps are accumulated
2384 /// in the `dep_errors` field of the returned report.
2385 /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2386 /// [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2387 /// distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2388 /// "upstream INVALIDATE" (plain forward, no self-process). The Rust
2389 /// port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2390 /// path — upstream INVALIDATE here DOES clear dep caches and cascade.
2391 /// If a "plain forward" mode surfaces as a real consumer need, add
2392 /// `up_with_options`.
2393 /// - **Tier 6 ([`Message::Teardown`]):** translates to
2394 /// [`Self::teardown`] on each dep. Cascades per the standard
2395 /// teardown path.
2396 ///
2397 /// # Errors
2398 ///
2399 /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2400 /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2401 pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2402 // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2403 // for consistent error UX — `up(unknown, Data)` and
2404 // `up(unknown, Pause)` both report `UnknownNode` rather than
2405 // splitting on the tier.
2406 let dep_ids: Vec<NodeId> = {
2407 let s = self.lock_state();
2408 let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2409 rec.dep_ids_vec()
2410 };
2411 let tier = message.tier();
2412 if tier == 3 || tier == 5 {
2413 return Err(UpError::TierForbidden { tier });
2414 }
2415 for dep_id in dep_ids {
2416 match message {
2417 Message::Pause(lock) => {
2418 let _ = self.pause(dep_id, lock);
2419 }
2420 Message::Resume(lock) => {
2421 let _ = self.resume(dep_id, lock);
2422 }
2423 Message::Invalidate => {
2424 self.invalidate(dep_id);
2425 }
2426 Message::Teardown => {
2427 self.teardown(dep_id);
2428 }
2429 // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2430 // routing table above.
2431 _ => {}
2432 }
2433 }
2434 Ok(())
2435 }
2436
2437 /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2438 /// [`Self::resume`]. Convenience for callers that don't already have an
2439 /// id-allocation scheme; user-supplied ids work too.
2440 #[must_use]
2441 pub fn alloc_lock_id(&self) -> LockId {
2442 let mut s = self.lock_state();
2443 let id = LockId::new(s.next_lock_id);
2444 s.next_lock_id += 1;
2445 id
2446 }
2447
2448 // -------------------------------------------------------------------
2449 // Registration — unified `register()` (D030, Slice D)
2450 //
2451 // All node kinds (State / Producer / Derived / Dynamic / Operator)
2452 // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2453 // wrappers (`register_state` / `register_producer` / `register_derived`
2454 // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2455 // and delegate. There is no parallel registration path internally.
2456 // -------------------------------------------------------------------
2457
2458 /// Unified node registration (D030).
2459 ///
2460 /// `reg` describes the node's identity (deps + closure-form fn id OR
2461 /// typed-op + per-kind opts). The kind is **derived from the field
2462 /// shape**, not stored — see [`NodeKind`].
2463 ///
2464 /// Sugar wrappers below ([`Self::register_state`],
2465 /// [`Self::register_producer`], [`Self::register_derived`],
2466 /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2467 /// registration for the common kinds and delegate here. Direct callers
2468 /// that need uncommon combinations (e.g., a partial-true derived) can
2469 /// invoke this method directly.
2470 ///
2471 /// # Errors
2472 ///
2473 /// Errors are returned in evaluation order — earlier phases short-circuit
2474 /// later ones, so a single registration produces at most one variant.
2475 ///
2476 /// **Phase 1 — lock-released, side-effect-free validation:**
2477 /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2478 /// `deps` is empty. Operator nodes need at least one dep — for
2479 /// subscription-managed combinators with no declared deps, use
2480 /// [`Self::register_producer`] instead.
2481 /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2482 /// is non-sentinel for a non-state shape (deps non-empty, or
2483 /// fn_or_op present). State nodes are the only kind with an initial
2484 /// cache.
2485 ///
2486 /// **Phase 2 — operator scratch construction (lock-released):**
2487 /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2488 /// / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2489 /// must have a real seed.
2490 ///
2491 /// **Phase 3 — state-lock validation (folded with insertion under a
2492 /// single lock acquisition per /qa F1 to prevent TOCTOU between
2493 /// validation and `nodes.insert`):**
2494 /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2495 /// a registered node id.
2496 /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2497 /// ERROR) AND not resubscribable — would create a permanent wedge.
2498 ///
2499 /// All errors are construction-time invariants — the dispatcher
2500 /// rejects the registration before any reactive state is created.
2501 /// On `Err`, no node has been added and any handle retains taken on
2502 /// the way in (operator scratch seed retains via
2503 /// [`BindingBoundary::retain_handle`]) have been released
2504 /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2505 /// discipline that covers both early-return AND unwind paths.
2506 /// `Last { default }` retains its `default` handle on the same
2507 /// release path.
2508 pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2509 let NodeRegistration {
2510 deps,
2511 fn_or_op,
2512 opts,
2513 } = reg;
2514 let NodeOpts {
2515 initial,
2516 equals,
2517 partial,
2518 is_dynamic,
2519 pausable,
2520 replay_buffer,
2521 } = opts;
2522
2523 // Derive the field shape from fn_or_op + deps.
2524 let (fn_id, op) = match fn_or_op {
2525 Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2526 Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2527 None => (None, None),
2528 };
2529
2530 // Phase 1 — lock-released, side-effect-free validation. Errors
2531 // here return BEFORE any handle retain is taken.
2532 //
2533 // - State (no deps + no fn + no op) is the only kind with `initial`.
2534 // - Dynamic flag only meaningful when fn + non-empty deps.
2535 // - Operator (op present) must have deps (P9: operator without deps
2536 // would skip activation — use a producer instead).
2537 let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2538 if op.is_some() && deps.is_empty() {
2539 return Err(RegisterError::OperatorWithoutDeps);
2540 }
2541 if initial != NO_HANDLE && !is_state_shape {
2542 return Err(RegisterError::InitialOnlyForStateNodes);
2543 }
2544
2545 // Phase 2 — build per-operator scratch struct (may take handle
2546 // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2547 // Lock-released per Slice E (D045) handshake discipline. Returns
2548 // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2549 // dangling handles.
2550 let scratch = match op {
2551 Some(operator_op) => self.make_op_scratch(operator_op)?,
2552 None => None,
2553 };
2554
2555 // Wrap scratch in an RAII guard immediately after Phase 2. From
2556 // here on, ANY early return / unwind path correctly releases the
2557 // scratch's handle retains via `OperatorScratch::release_handles`
2558 // (Slice H /qa F2 — defense against panics between Phase 2 and
2559 // Phase 3 cleanup branch). Lock-released because the guard is
2560 // declared BEFORE `lock_state()` below — variable destruction
2561 // order is reverse declaration order, so the `MutexGuard` drops
2562 // first on any return path.
2563 let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2564
2565 // Phase 3 — state-lock-required validation, FOLDED with insertion
2566 // under a single `lock_state()` acquisition per /qa F1. The
2567 // pre-/qa version split this into two acquisitions (one for
2568 // validation, one for `alloc_node_id` + `nodes.insert`), opening
2569 // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2570 // non-resubscribable dep could slip in and recreate the wedge
2571 // `TerminalDep` was designed to prevent. Single locked region
2572 // closes the gap.
2573 let mut s = self.lock_state();
2574
2575 for &dep in &deps {
2576 if !s.nodes.contains_key(&dep) {
2577 return Err(RegisterError::UnknownDep(dep));
2578 }
2579 }
2580 // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2581 // rejection at registration time. Adding a non-resubscribable
2582 // terminal node as a dep at registration creates a permanent wedge.
2583 for &dep in &deps {
2584 let dep_rec = s.require_node(dep);
2585 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2586 return Err(RegisterError::TerminalDep(dep));
2587 }
2588 }
2589
2590 // Validation passed — install. Take scratch out of the guard
2591 // (disarms the release-on-drop) and continue using `s`.
2592 let installed_scratch = scratch_guard.take();
2593
2594 let id = s.alloc_node_id();
2595
2596 // `tracked`: Static derived + Operator track all deps; Dynamic
2597 // starts empty and fills via fn return; State / Producer have no
2598 // deps so tracked is empty.
2599 let tracked: HashSet<usize> = if op.is_some() {
2600 (0..deps.len()).collect()
2601 } else if is_dynamic {
2602 HashSet::new()
2603 } else if fn_id.is_some() && !deps.is_empty() {
2604 // Static derived
2605 (0..deps.len()).collect()
2606 } else {
2607 HashSet::new()
2608 };
2609
2610 let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2611
2612 let rec = NodeRecord {
2613 dep_records,
2614 fn_id,
2615 op,
2616 is_dynamic,
2617 equals,
2618 cache: initial,
2619 has_fired_once: initial != NO_HANDLE,
2620 subscribers: HashMap::new(),
2621 subscribers_revision: 0,
2622 tracked,
2623 dirty: false,
2624 involved_this_wave: false,
2625 pause_state: PauseState::Active,
2626 pausable,
2627 replay_buffer_cap: replay_buffer,
2628 replay_buffer: VecDeque::new(),
2629 terminal: None,
2630 has_received_teardown: false,
2631 resubscribable: false,
2632 meta_companions: Vec::new(),
2633 partial,
2634 op_scratch: installed_scratch,
2635 };
2636 s.nodes.insert(id, rec);
2637 s.children.insert(id, HashSet::new());
2638 for &dep in &deps {
2639 s.children.entry(dep).or_default().insert(id);
2640 }
2641 // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
2642 // membership BEFORE dropping the state lock. Closes the
2643 // eventual-consistency window where a concurrent thread observed
2644 // the new node in `s.nodes` / new edges in `s.children` but the
2645 // registry hadn't unioned the partition yet. Today benign
2646 // (`partition_of` is debug-only); under Y1's wave engine
2647 // migration `lock_for(node)` consumes registry state on the hot
2648 // path, and the window means `lock_for` could resolve to a
2649 // partition that's been topologically unioned in `s.children`
2650 // but not yet in `registry`.
2651 //
2652 // **Lock-discipline invariant:** `state lock → registry mutex`
2653 // (one-way; never registry → state). Registry mutex is
2654 // uncontended in the X5 substrate — the only acquisition sites
2655 // are this one + `Core::set_deps` + the read-only accessors
2656 // `partition_count`/`partition_of` and (Y1+) `lock_for` — none
2657 // of which take the state lock — so the inner critical section
2658 // adds negligible latency.
2659 {
2660 let mut reg = self.registry.lock();
2661 reg.ensure_registered(id);
2662 for &dep in &deps {
2663 reg.union_nodes(id, dep);
2664 }
2665 }
2666 drop(s);
2667 self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2668 Ok(id)
2669 }
2670
2671 /// Sugar over [`Self::register`] — register a state node. `initial`
2672 /// may be [`NO_HANDLE`] to start sentinel.
2673 ///
2674 /// `partial` is accepted for surface consistency (D019); for state
2675 /// nodes it has no effect (state nodes don't fire fn).
2676 ///
2677 /// # Errors
2678 ///
2679 /// State registration is structurally simple — no deps, no op — so
2680 /// the only reachable variant is none in practice. Returns
2681 /// [`Result`] for surface consistency with [`Self::register`].
2682 pub fn register_state(
2683 &self,
2684 initial: HandleId,
2685 partial: bool,
2686 ) -> Result<NodeId, RegisterError> {
2687 self.register(NodeRegistration {
2688 deps: Vec::new(),
2689 fn_or_op: None,
2690 opts: NodeOpts {
2691 initial,
2692 partial,
2693 ..NodeOpts::default()
2694 },
2695 })
2696 }
2697
2698 /// Sugar over [`Self::register`] — register a producer node (D031,
2699 /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2700 /// via [`BindingBoundary::producer_deactivate`] when the last
2701 /// subscriber unsubscribes.
2702 ///
2703 /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2704 /// (see `graphrefly-operators::producer`) to subscribe to other Core
2705 /// nodes — the zip / concat / race / takeUntil pattern.
2706 ///
2707 /// # Errors
2708 ///
2709 /// Producer registration has no user-supplied deps, so structurally
2710 /// none of [`RegisterError`]'s variants are reachable. Returns
2711 /// [`Result`] for surface consistency with [`Self::register`].
2712 pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2713 self.register(NodeRegistration {
2714 deps: Vec::new(),
2715 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2716 opts: NodeOpts {
2717 // Producers have no deps — the first-run gate is degenerate.
2718 partial: true,
2719 ..NodeOpts::default()
2720 },
2721 })
2722 }
2723
2724 /// Sugar over [`Self::register`] — register a derived (static) node.
2725 /// `partial` controls the R2.5.3 first-run gate (D011).
2726 ///
2727 /// # Errors
2728 ///
2729 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2730 /// registered.
2731 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2732 /// resubscribable.
2733 pub fn register_derived(
2734 &self,
2735 deps: &[NodeId],
2736 fn_id: FnId,
2737 equals: EqualsMode,
2738 partial: bool,
2739 ) -> Result<NodeId, RegisterError> {
2740 self.register(NodeRegistration {
2741 deps: deps.to_vec(),
2742 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2743 opts: NodeOpts {
2744 equals,
2745 partial,
2746 ..NodeOpts::default()
2747 },
2748 })
2749 }
2750
2751 /// Sugar over [`Self::register`] — register a dynamic node (fn
2752 /// declares its actually-tracked dep indices per fire). `partial`
2753 /// controls the R2.5.3 first-run gate (D011).
2754 ///
2755 /// # Errors
2756 ///
2757 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2758 /// registered.
2759 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2760 /// resubscribable.
2761 pub fn register_dynamic(
2762 &self,
2763 deps: &[NodeId],
2764 fn_id: FnId,
2765 equals: EqualsMode,
2766 partial: bool,
2767 ) -> Result<NodeId, RegisterError> {
2768 self.register(NodeRegistration {
2769 deps: deps.to_vec(),
2770 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2771 opts: NodeOpts {
2772 equals,
2773 partial,
2774 is_dynamic: true,
2775 ..NodeOpts::default()
2776 },
2777 })
2778 }
2779
2780 /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2781 /// box for an operator variant, taking any required handle retains.
2782 /// Shared between `register_operator` (initial install) and
2783 /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2784 ///
2785 /// # Errors
2786 ///
2787 /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2788 /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2789 /// must have a real seed). Refcount discipline: the seed-sentinel
2790 /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2791 /// `Err` leaves no handles dangling.
2792 fn make_op_scratch(
2793 &self,
2794 op: OperatorOp,
2795 ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2796 use crate::op_state::{
2797 DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2798 TakeWhileState,
2799 };
2800 // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2801 // BEFORE retain_handle so an Err return leaves no handles dangling.
2802 //
2803 // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2804 // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2805 // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2806 // yet — no leak. If `retain_handle` panics after Box succeeds,
2807 // the `Box<State>` is dropped on unwind; State has no handle yet
2808 // (we haven't touched the registry refcount), so still no leak.
2809 // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2810 // cover panics AFTER make_op_scratch returns.
2811 match op {
2812 OperatorOp::Scan { seed, .. } => {
2813 if seed == NO_HANDLE {
2814 return Err(RegisterError::OperatorSeedSentinel);
2815 }
2816 let state = Box::new(ScanState { acc: seed });
2817 self.binding.retain_handle(seed);
2818 Ok(Some(state))
2819 }
2820 OperatorOp::Reduce { seed, .. } => {
2821 if seed == NO_HANDLE {
2822 return Err(RegisterError::OperatorSeedSentinel);
2823 }
2824 let state = Box::new(ReduceState { acc: seed });
2825 self.binding.retain_handle(seed);
2826 Ok(Some(state))
2827 }
2828 OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2829 OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2830 OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2831 OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2832 OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2833 OperatorOp::Last { default } => {
2834 let state = Box::new(LastState {
2835 latest: NO_HANDLE,
2836 default,
2837 });
2838 if default != NO_HANDLE {
2839 self.binding.retain_handle(default);
2840 }
2841 Ok(Some(state))
2842 }
2843 OperatorOp::Map { .. }
2844 | OperatorOp::Filter { .. }
2845 | OperatorOp::Combine { .. }
2846 | OperatorOp::WithLatestFrom { .. }
2847 | OperatorOp::Merge => Ok(None),
2848 }
2849 }
2850
2851 /// Sugar over [`Self::register`] — register a built-in operator node
2852 /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
2853 /// lives in `fire_operator`; `op` selects which per-operator FFI
2854 /// method on [`BindingBoundary`] gets called per fire.
2855 ///
2856 /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
2857 /// [`Last`] with a default), the seed/default handle is captured
2858 /// into the appropriate
2859 /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
2860 /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
2861 /// share via [`BindingBoundary::retain_handle`].
2862 ///
2863 /// # Errors
2864 ///
2865 /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
2866 /// [`Self::register_producer`] instead).
2867 /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
2868 /// [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
2869 /// [`NO_HANDLE`] seed.
2870 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2871 /// registered.
2872 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2873 /// resubscribable.
2874 pub fn register_operator(
2875 &self,
2876 deps: &[NodeId],
2877 op: OperatorOp,
2878 opts: OperatorOpts,
2879 ) -> Result<NodeId, RegisterError> {
2880 self.register(NodeRegistration {
2881 deps: deps.to_vec(),
2882 fn_or_op: Some(NodeFnOrOp::Op(op)),
2883 opts: NodeOpts {
2884 equals: opts.equals,
2885 partial: opts.partial,
2886 ..NodeOpts::default()
2887 },
2888 })
2889 }
2890
2891 // -------------------------------------------------------------------
2892 // Subscription
2893 // -------------------------------------------------------------------
2894
2895 /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
2896 /// dropping the handle unsubscribes the sink. Per §10.12, no manual
2897 /// `unsubscribe(node, id)` call is required.
2898 ///
2899 /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
2900 /// the START handshake fires. The handshake contents depend on node
2901 /// state:
2902 /// - Sentinel cache + live (non-terminal): `[START]`
2903 /// - Cached + live: `[START, DATA(handle)]`
2904 /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
2905 /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
2906 ///
2907 /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
2908 /// marked resubscribable via [`Self::set_resubscribable`] AND has
2909 /// terminated, the subscribe call first **resets** the node — clears
2910 /// `terminal`, `has_fired_once`, `has_received_teardown`, all
2911 /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
2912 /// drains the pause lockset. The new subscriber then receives a fresh
2913 /// `[START]` (cache may survive for state nodes; sentinel for compute).
2914 ///
2915 /// Activation (R2.2.3 step 5): if this is the first subscriber and the
2916 /// node is a derived/dynamic compute, recursively activate deps so their
2917 /// cached handles fill our `dep_handles`.
2918 #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
2919 pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
2920 // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
2921 //
2922 // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
2923 // through, cross-thread blocks). This is the cross-thread
2924 // serialization point that preserves R1.3.5.a happens-after
2925 // ordering across the lock-released handshake fire.
2926 // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
2927 // reset if applicable, snapshot handshake state, install sink
2928 // in `subscribers`. Drop state lock.
2929 // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
2930 // `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
2931 // / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
2932 // may re-enter Core freely — same-thread re-entry passes
2933 // through `wave_owner` reentrantly.
2934 // 4. Run activation under `run_wave` if needed (first subscriber
2935 // on a non-state node).
2936 // 5. Drop `wave_owner`.
2937 //
2938 // Race-fix discipline: the sink is installed in `subscribers`
2939 // BEFORE the state lock drops, so concurrent threads that
2940 // acquire `wave_owner` after our scope sees the sink already
2941 // registered. Cross-thread emits block on `wave_owner` until
2942 // we drop it, ensuring all our handshake calls land before
2943 // any concurrent wave's flush observes the sink.
2944
2945 // Acquire the partition's `wave_owner` first — cross-thread
2946 // serialization point. Per Slice Y1 / Phase E (2026-05-08),
2947 // subscribe routes through the per-partition lock instead of
2948 // a Core-global one. Subscribe touches only `node_id`'s
2949 // partition (activation cascade stays within the partition
2950 // because dep edges are unioned). `partition_wave_owner_lock_arc`
2951 // does retry-validate against concurrent union/split.
2952 // `lock_arc()` is `!Send`; same-thread reentrant.
2953 let _wave_guard = self.partition_wave_owner_lock_arc(node_id);
2954
2955 let (sub_id, tier_slices, needs_activation, did_reset) = {
2956 let mut s = self.lock_state();
2957 let sub_id = s.alloc_sub_id();
2958
2959 // Resubscribable reset: terminal + flagged → clear lifecycle
2960 // state so the incoming subscriber starts fresh. F3 audit
2961 // guard: a node that has received TEARDOWN (R2.6.4) is
2962 // permanently destroyed at this layer; resurrecting it via a
2963 // late subscribe is a category error. COMPLETE/ERROR is
2964 // recoverable for resubscribable nodes; TEARDOWN is not. The
2965 // handshake will still replay the terminal in the non-reset
2966 // branch so the late subscriber sees a clean
2967 // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
2968 let needs_reset = {
2969 let rec = s.require_node(node_id);
2970 rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
2971 };
2972 if needs_reset {
2973 self.reset_for_fresh_lifecycle(&mut s, node_id);
2974 }
2975
2976 // Snapshot handshake state under lock.
2977 let (cache, is_state, first_subscriber, terminal, torn_down) = {
2978 let rec = s.require_node(node_id);
2979 (
2980 rec.cache,
2981 rec.is_state(),
2982 rec.subscribers.is_empty(),
2983 rec.terminal,
2984 rec.has_received_teardown,
2985 )
2986 };
2987
2988 // Build per-tier handshake slices. Each non-empty slice is
2989 // fired as a separate sink call (R1.3.5.a tier-split).
2990 let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
2991 tier_slices.push(vec![Message::Start]);
2992 if cache != NO_HANDLE {
2993 tier_slices.push(vec![Message::Data(cache)]);
2994 }
2995 // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
2996 // [Start] (and the cache slice, if present) and any terminal.
2997 // Each buffered handle becomes a separate per-tier slice so
2998 // late subscribers see the historical Data sequence as
2999 // distinct sink calls.
3000 //
3001 // Dedupe: when a cache slice is present and the buffer's last
3002 // entry is the same handle (the typical case — cache always
3003 // tracks the last DATA emitted, and the buffer's tail entry
3004 // is that same DATA), skip the last buffer entry to avoid
3005 // delivering Data(cache) twice. For state nodes whose cache
3006 // survives unsubscribe, the buffer may have older entries
3007 // the cache doesn't reflect; the dedupe only drops the
3008 // single trailing entry that equals cache. (QA A1, 2026-05-07)
3009 let replay_handles: Vec<HandleId> = {
3010 let rec = s.require_node(node_id);
3011 let cap = rec.replay_buffer_cap.unwrap_or(0);
3012 if cap == 0 {
3013 Vec::new()
3014 } else {
3015 let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3016 if cache != NO_HANDLE && v.last() == Some(&cache) {
3017 v.pop();
3018 }
3019 v
3020 }
3021 };
3022 for h in &replay_handles {
3023 tier_slices.push(vec![Message::Data(*h)]);
3024 }
3025 if let Some(t) = terminal {
3026 tier_slices.push(vec![match t {
3027 TerminalKind::Complete => Message::Complete,
3028 TerminalKind::Error(h) => Message::Error(h),
3029 }]);
3030 }
3031 if torn_down {
3032 tier_slices.push(vec![Message::Teardown]);
3033 }
3034
3035 // Install sink BEFORE dropping state lock so any thread that
3036 // subsequently acquires `wave_owner` (after our scope ends)
3037 // sees the sink already registered.
3038 //
3039 // Slice X4 / D2: bump `subscribers_revision` alongside the
3040 // insert so a pending_notify entry opened earlier in the same
3041 // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3042 // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3043 // its next `queue_notify` push — making the new sink visible
3044 // to subsequent emits' flush slices, while the pre-subscribe
3045 // batch's snapshot stays frozen so we don't double-deliver
3046 // earlier emits via the wave's flush AND the new sub's
3047 // handshake replay.
3048 {
3049 let rec = s.require_node_mut(node_id);
3050 rec.subscribers.insert(sub_id, sink.clone());
3051 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3052 }
3053
3054 let needs_activation = first_subscriber && !is_state;
3055 (sub_id, tier_slices, needs_activation, needs_reset)
3056 // state lock drops here
3057 };
3058
3059 // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3060 // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3061 // entry (clearing both `store` and any residual `current_cleanup`).
3062 // The new subscriber's first invoke_fn sees a fresh empty store.
3063 // Fires AFTER the state lock drops so the binding's
3064 // `node_ctx.lock()` can't deadlock against Core's state lock — and
3065 // BEFORE the handshake so the wipe is observable before any
3066 // user-visible interaction with the new lifecycle.
3067 if did_reset {
3068 self.binding.wipe_ctx(node_id);
3069 }
3070
3071 // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
3072 // thread re-entry passes through `wave_owner` reentrantly.
3073 // Cross-thread emits block at `wave_owner` until our scope ends.
3074 //
3075 // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3076 // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3077 // the handshake fires (load-bearing — concurrent threads observe
3078 // the sink immediately). If a sink panics on tier N, the panic
3079 // would otherwise unwind out of `subscribe` BEFORE the
3080 // `Subscription` handle is constructed, leaving the sink
3081 // registered in `subscribers` with no user-held handle to drop.
3082 // Subsequent waves' `flush_notifications` would re-fire the
3083 // panicking sink forever.
3084 //
3085 // On panic: remove the sink from `subscribers` (via the
3086 // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3087 // RAII, and resume the unwind so the user observes the panic at
3088 // the call site. Same effect as the user dropping the
3089 // `Subscription` immediately, but pre-emptive.
3090 for slice in &tier_slices {
3091 let sink_clone = sink.clone();
3092 let slice_ref: &[Message] = slice;
3093 let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3094 if let Err(panic_payload) = result {
3095 // Remove the orphaned sink. Best-effort: if the node was
3096 // since torn down (e.g., the sink itself called teardown
3097 // before panicking), the entry may already be gone.
3098 {
3099 let mut s = self.lock_state();
3100 if let Some(rec) = s.nodes.get_mut(&node_id) {
3101 rec.subscribers.remove(&sub_id);
3102 // Slice X4 / D2: keep revision-tracked snapshot
3103 // discipline consistent with the install site —
3104 // any pending_notify entry that already absorbed
3105 // the panicking sink under the post-install
3106 // revision should start a fresh batch on its
3107 // next queue_notify push.
3108 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3109 }
3110 }
3111 std::panic::resume_unwind(panic_payload);
3112 }
3113 }
3114
3115 // Run activation if needed. `run_wave_for(node_id)` acquires
3116 // only the partitions transitively touched from `node_id`
3117 // (downstream cascade + meta-companion teardown reach) — same-
3118 // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3119 if needs_activation {
3120 self.run_wave_for(node_id, |this| {
3121 let mut s = this.lock_state();
3122 this.activate_derived(&mut s, node_id);
3123 });
3124 }
3125
3126 Subscription {
3127 state: Arc::downgrade(&self.state),
3128 node_id,
3129 sub_id,
3130 }
3131 // _wave_guard drops here, releasing wave_owner.
3132 }
3133
3134 /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3135 /// reset their terminal-lifecycle state on a fresh subscribe — see
3136 /// [`Self::subscribe`].
3137 ///
3138 /// Configuration call — must be made before the node has any active
3139 /// subscribers, since changing the policy mid-flight would surprise
3140 /// existing observers.
3141 ///
3142 /// # Panics
3143 ///
3144 /// Panics if the node has subscribers (the policy is observable
3145 /// behavior; changing it after the fact would change semantics for
3146 /// existing sinks).
3147 pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3148 let mut s = self.lock_state();
3149 let rec = s.require_node_mut(node_id);
3150 assert!(
3151 rec.subscribers.is_empty(),
3152 "set_resubscribable: node already has subscribers; \
3153 configure resubscribable before any subscribe call"
3154 );
3155 rec.resubscribable = resubscribable;
3156 }
3157
3158 /// Reset a resubscribable node's terminal-lifecycle state. Called from
3159 /// `subscribe` when a late subscriber arrives at a flagged node.
3160 ///
3161 /// Released: terminal-slot retain (Error handle), all per-dep terminal
3162 /// retains (Error handles), all data_batch retains.
3163 /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3164 /// dep_records to sentinel, the pause lockset (any held locks are
3165 /// released — replay buffer drops silently because there are no
3166 /// subscribers to flush to).
3167 fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
3168 // Phase 1: collect wave-state handle releases + take the old
3169 // op_scratch + reset other state. Take all mutations under one
3170 // borrow so the post-borrow phases don't re-walk dep_records.
3171 let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3172 let rec = s.require_node_mut(node_id);
3173 let mut hs = Vec::new();
3174 if let Some(TerminalKind::Error(h)) = rec.terminal {
3175 hs.push(h);
3176 }
3177 for dr in &rec.dep_records {
3178 if let Some(TerminalKind::Error(h)) = dr.terminal {
3179 hs.push(h);
3180 }
3181 for &h in &dr.data_batch {
3182 hs.push(h);
3183 }
3184 // Slice C-3 /qa: also release `prev_data`. Prior to this
3185 // collection, `reset_for_fresh_lifecycle` overwrote
3186 // `dr.prev_data = NO_HANDLE` without releasing the old
3187 // handle, leaking one share per dep per resubscribable
3188 // cycle. The leak was masked because no test exercised
3189 // the per-dep `prev_data` retain across a lifecycle
3190 // reset; surfaced by the T1 tightening of
3191 // `last_releases_buffered_latest_on_lifecycle_reset`.
3192 if dr.prev_data != NO_HANDLE {
3193 hs.push(dr.prev_data);
3194 }
3195 }
3196 // Take pause_state's buffer; collect its payload handles for
3197 // release (they were retained at queue_notify time; buffer
3198 // drops because the new subscriber starts fresh).
3199 let mut pulled = Vec::new();
3200 if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3201 for msg in buffer.drain(..) {
3202 if let Some(h) = msg.payload_handle() {
3203 pulled.push(h);
3204 }
3205 }
3206 }
3207 // Slice E1: drain the replay buffer too — the new subscriber
3208 // gets a fresh lifecycle and shouldn't see prior emissions.
3209 for h in rec.replay_buffer.drain(..) {
3210 pulled.push(h);
3211 }
3212 // Reset wave / lifecycle state.
3213 rec.terminal = None;
3214 rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3215 rec.has_received_teardown = false;
3216 for dr in &mut rec.dep_records {
3217 dr.prev_data = NO_HANDLE;
3218 dr.data_batch.clear();
3219 dr.terminal = None;
3220 dr.dirty = false;
3221 dr.involved_this_wave = false;
3222 }
3223 rec.pause_state = PauseState::Active;
3224 rec.involved_this_wave = false;
3225 rec.dirty = false;
3226 // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3227 // the post-reset first fire repopulates from the fn's
3228 // returned tracked-deps set.
3229 if rec.is_dynamic {
3230 rec.tracked.clear();
3231 }
3232 // Take the old scratch out so we can release its handles and
3233 // install a fresh one. Operator op is copied for the
3234 // rebuild step below.
3235 let prev_op = rec.op;
3236 let old = std::mem::take(&mut rec.op_scratch);
3237 (prev_op, old, hs, pulled)
3238 };
3239
3240 // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3241 // build the fresh scratch FIRST, taking new retains on any
3242 // seed/default handles. This must run BEFORE Phase 3 releases
3243 // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3244 // `latest` (Last) aliases the new `seed`/`default` (common:
3245 // `fold(seed, x) == seed` interns to the same registry entry),
3246 // releasing the old share first could collapse the binding's
3247 // registry slot to zero (production bindings remove the value
3248 // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3249 // and a subsequent `retain_handle` on the new seed would bump a
3250 // refcount on a slot whose value has been removed. By taking
3251 // the new retains first, we floor the refcount at ≥1 before
3252 // any release happens.
3253 let new_scratch = match prev_op {
3254 // Slice H: the OperatorOp stored on NodeRecord previously
3255 // passed `make_op_scratch` validation at registration time
3256 // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3257 // sentinel seeds; Last { default: NO_HANDLE } is accepted
3258 // and never errors). Re-running it here on the same op
3259 // value is structurally guaranteed to succeed.
3260 Some(op) => self
3261 .make_op_scratch(op)
3262 .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3263 None => None,
3264 };
3265
3266 // Phase 3: NOW release handles owned by the old op_scratch
3267 // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3268 // default). Safe per Phase 2's retain-first floor. The boxed
3269 // value is consumed and dropped after.
3270 if let Some(scratch) = old_scratch.as_mut() {
3271 scratch.release_handles(&*self.binding);
3272 }
3273 drop(old_scratch);
3274
3275 // Phase 4: install the fresh scratch.
3276 {
3277 let rec = s.require_node_mut(node_id);
3278 rec.op_scratch = new_scratch;
3279 }
3280
3281 // Phase 5: release wave-state handles collected in phase 1.
3282 for h in handles_to_release {
3283 self.binding.release_handle(h);
3284 }
3285 for h in pause_buffer_payloads {
3286 self.binding.release_handle(h);
3287 }
3288 }
3289
3290 /// Activate `root` and any transitive uncached compute deps so their
3291 /// caches fill our dep_handles slots.
3292 ///
3293 /// Slice A close (M1): pure dep-walk + dep_handles population +
3294 /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3295 /// call — the outer caller (typically `Core::subscribe` via
3296 /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3297 /// around `invoke_fn`.
3298 ///
3299 /// Walk shape:
3300 /// 1. **Discover phase (DFS via Vec stack):** starting at `root`,
3301 /// walk transitively-needing-activation deps via the `deps`
3302 /// chain. Build an ordering where each node appears AFTER all
3303 /// of its uncached compute deps — i.e., reverse topological
3304 /// among the visited subgraph.
3305 /// 2. **Deliver phase (forward iteration):** for each visited
3306 /// node in dep-first order, push deps' caches into the node's
3307 /// `dep_handles` slots. Caches that were sentinel pre-walk are
3308 /// filled because their parent's fn fires later in the wave's
3309 /// drain loop and `commit_emission` propagates new caches forward
3310 /// via `deliver_data_to_consumer` — the same path this method
3311 /// uses for the initial seed. Adds the node to `pending_fires`
3312 /// if its tracked-deps gate is satisfied; the wave-engine drain
3313 /// fires the fn lock-released around `invoke_fn`.
3314 pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3315 // Phase 1: discover. DFS to collect every compute node reachable
3316 // via deps that doesn't yet have a cache and hasn't fired.
3317 // Record them in dep-first (post-order) so phase 2 can deliver
3318 // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3319 // means "first visit: push deps then re-push self with finalize=true";
3320 // `finalize=true` means "deps have been expanded, append self to
3321 // `order`."
3322 let mut visited: HashSet<NodeId> = HashSet::new();
3323 let mut order: Vec<NodeId> = Vec::new();
3324 let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3325 while let Some((id, finalize)) = stack.pop() {
3326 if finalize {
3327 order.push(id);
3328 continue;
3329 }
3330 if !visited.insert(id) {
3331 continue;
3332 }
3333 stack.push((id, true));
3334 let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3335 for dep_id in dep_ids {
3336 let (dep_is_state, dep_cache, dep_has_fired) = {
3337 let dep_rec = s.require_node(dep_id);
3338 (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3339 };
3340 if !dep_is_state
3341 && dep_cache == NO_HANDLE
3342 && !dep_has_fired
3343 && !visited.contains(&dep_id)
3344 {
3345 stack.push((dep_id, false));
3346 }
3347 }
3348 }
3349
3350 // Phase 2: deliver caches in dep-first order. For each node, walk
3351 // its deps and call `deliver_data_to_consumer` for any with caches.
3352 // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3353 // to walk; queue them directly into `pending_fires` so the wave
3354 // engine fires their fn once on activation.
3355 //
3356 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3357 // per-thread WaveState. State lock + thread_local borrow are
3358 // independent; deliver_data_to_consumer also writes pending_fires
3359 // via WaveState (no nested with_wave_state borrows here).
3360 for &id in &order {
3361 let (dep_ids, is_producer) = {
3362 let rec = s.require_node(id);
3363 (rec.dep_ids_vec(), rec.is_producer())
3364 };
3365 if is_producer {
3366 crate::batch::with_wave_state(|ws| {
3367 ws.pending_fires.insert(id);
3368 });
3369 continue;
3370 }
3371 for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3372 let dep_cache = s.require_node(dep_id).cache;
3373 if dep_cache != NO_HANDLE {
3374 self.deliver_data_to_consumer(s, id, i, dep_cache);
3375 }
3376 }
3377 }
3378 }
3379
3380 // -------------------------------------------------------------------
3381 // Emission entry point
3382 // -------------------------------------------------------------------
3383
3384 /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3385 /// fn fires for downstream).
3386 ///
3387 /// Silent no-op if the node has already terminated (R1.3.4). The handle
3388 /// passed in is still released by the caller's binding-side intern path
3389 /// — no implicit retain is consumed when the call short-circuits.
3390 ///
3391 /// # Panics
3392 ///
3393 /// Panics if `node_id` is not a state node, or if `new_handle` is
3394 /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3395 pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3396 assert!(
3397 new_handle != NO_HANDLE,
3398 "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3399 );
3400 // Validate + terminal short-circuit under a brief lock.
3401 //
3402 // emit() is valid for State and Producer nodes — both are
3403 // intrinsic sources whose values are not derived from declared
3404 // deps. State nodes get emit() from user code; Producer nodes
3405 // get emit() from sink callbacks the producer's build closure
3406 // registered (sink fires → re-enter Core → emit on self).
3407 // Derived / Dynamic / Operator nodes emit via their fn return
3408 // value through fire_fn / fire_operator, NOT via emit().
3409 {
3410 let s = self.lock_state();
3411 let rec = s.require_node(node_id);
3412 assert!(
3413 rec.is_state() || rec.is_producer(),
3414 "emit() is for state or producer nodes only; \
3415 derived/dynamic/operator emit via their fn return value"
3416 );
3417 if rec.terminal.is_some() {
3418 drop(s);
3419 // Caller's intern share would otherwise leak; cache slot
3420 // ownership doesn't transfer because we're not advancing
3421 // cache. Released lock-released so the binding can't
3422 // deadlock against an internal binding mutex.
3423 self.binding.release_handle(new_handle);
3424 return;
3425 }
3426 }
3427 // Run wave on `node_id`'s touched partitions. Slice Y1 / Phase E:
3428 // emit cascades only via `s.children`, all unioned with `node_id`'s
3429 // partition by construction (dep edges = union edges). Common case
3430 // is a single-partition acquire — disjoint-partition emits run
3431 // truly parallel under per-partition `wave_owner`.
3432 self.run_wave_for(node_id, |this| {
3433 this.commit_emission(node_id, new_handle);
3434 });
3435 }
3436
3437 /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3438 #[must_use]
3439 pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3440 self.lock_state().require_node(node_id).cache
3441 }
3442
3443 /// Whether the node's fn has fired at least once (compute) OR it has had
3444 /// a non-sentinel value (state).
3445 #[must_use]
3446 pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3447 self.lock_state().require_node(node_id).has_fired_once
3448 }
3449
3450 // -------------------------------------------------------------------
3451 // Read-side inspection helpers (Slice E+, M2)
3452 //
3453 // Non-panicking accessors for graph-layer introspection (`describe()`,
3454 // `observe()`, `node_count()`). All five return Option/empty for
3455 // unknown ids — they're meant to back walks over `node_ids()` where
3456 // the caller already knows the ids are valid, plus debugging /
3457 // dry-run probes that prefer "absence" over "panic".
3458 //
3459 // Keep these strictly read-only: no wave entry, no binding callbacks,
3460 // no lock release. Each takes the state lock once, copies a small
3461 // value, and drops the lock.
3462 // -------------------------------------------------------------------
3463
3464 /// Snapshot of every registered `NodeId` in unspecified order. The
3465 /// order matches `HashMap` iteration over the internal node table —
3466 /// callers that need stable ordering should track names at the
3467 /// `Graph` layer (canonical spec §3.5 namespace).
3468 #[must_use]
3469 pub fn node_ids(&self) -> Vec<NodeId> {
3470 self.lock_state().nodes.keys().copied().collect()
3471 }
3472
3473 /// Total number of nodes registered in this Core.
3474 #[must_use]
3475 pub fn node_count(&self) -> usize {
3476 self.lock_state().nodes.len()
3477 }
3478
3479 /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3480 /// **derived** from the field shape per D030 — see [`NodeKind`].
3481 #[must_use]
3482 pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3483 self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3484 }
3485
3486 /// Snapshot of the node's deps in declaration order. Empty for
3487 /// unknown nodes or for state nodes (which have no deps).
3488 #[must_use]
3489 pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3490 self.lock_state()
3491 .nodes
3492 .get(&node_id)
3493 .map(NodeRecord::dep_ids_vec)
3494 .unwrap_or_default()
3495 }
3496
3497 /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3498 /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3499 /// the node emitted. `None` for live nodes or unknown ids.
3500 #[must_use]
3501 pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3502 self.lock_state()
3503 .nodes
3504 .get(&node_id)
3505 .and_then(|r| r.terminal)
3506 }
3507
3508 /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3509 /// queued but the matching tier-3 settle has not yet flushed).
3510 /// `false` for unknown ids. Mostly useful for `describe()` status
3511 /// classification (R3.6.1 `"dirty"`).
3512 #[must_use]
3513 pub fn is_dirty(&self, node_id: NodeId) -> bool {
3514 self.lock_state()
3515 .nodes
3516 .get(&node_id)
3517 .is_some_and(|r| r.dirty)
3518 }
3519
3520 /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
3521 /// the companions added via [`Self::add_meta_companion`]). Empty
3522 /// for unknown ids or for nodes with no companions registered.
3523 ///
3524 /// Used by the graph layer's `signal_invalidate` to filter meta
3525 /// children out of the broadcast (canonical R3.7.2 — meta caches
3526 /// are preserved across graph-wide INVALIDATE).
3527 #[must_use]
3528 pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
3529 self.lock_state()
3530 .nodes
3531 .get(&parent)
3532 .map(|r| r.meta_companions.clone())
3533 .unwrap_or_default()
3534 }
3535
3536 // -------------------------------------------------------------------
3537 // Wave engine — lives in `crate::batch` (Slice C-1 module split;
3538 // Slice A close M1 refactor lifted the binding-callback re-entrance
3539 // restrictions). The methods are still on `Core`; see `batch.rs` for:
3540 //
3541 // - `run_wave` — wave entry, manages own locking.
3542 // - `drain_and_flush` — drain phase, lock-released around invoke_fn.
3543 // - `commit_emission` — lock-released around custom_equals.
3544 // - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
3545 // `flush_notifications` — wave-engine helpers.
3546 // -------------------------------------------------------------------
3547}
3548
3549// -----------------------------------------------------------------------
3550// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
3551// -----------------------------------------------------------------------
3552
3553impl Core {
3554 /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
3555 /// this call:
3556 ///
3557 /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
3558 /// termination).
3559 /// - The node's fn no longer fires.
3560 /// - The node's cache is preserved (last value still observable via
3561 /// `cache_of`).
3562 /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
3563 /// - Auto-cascade gating (Lock 2.B): each child that has all of its
3564 /// deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
3565 /// dominates COMPLETE — if any of a child's deps emitted ERROR, the
3566 /// child auto-cascades that ERROR instead.
3567 ///
3568 /// Idempotent: calling `complete` on an already-terminal node is a no-op.
3569 ///
3570 /// # Panics
3571 ///
3572 /// Panics if `node_id` is unknown.
3573 pub fn complete(&self, node_id: NodeId) {
3574 self.emit_terminal(node_id, TerminalKind::Complete);
3575 }
3576
3577 /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
3578 /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
3579 /// already interned the error value before this call. Same lifecycle
3580 /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
3581 /// cascade gating.
3582 ///
3583 /// # Panics
3584 ///
3585 /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
3586 pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
3587 assert!(
3588 error_handle != NO_HANDLE,
3589 "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
3590 );
3591 self.emit_terminal(node_id, TerminalKind::Error(error_handle));
3592 // The caller's intern share for `error_handle` is NOT transferred
3593 // to any slot — `terminate_node` takes its OWN retain for every
3594 // populated `terminal` and `dep_terminals` slot. Release the
3595 // caller's share here (mirrors `Core::emit`'s short-circuit
3596 // release on terminal). Without this, every `error()` call leaks
3597 // one binding-side handle ref. Slice A-bigger /qa item D fix.
3598 self.binding.release_handle(error_handle);
3599 }
3600
3601 fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
3602 {
3603 let s = self.lock_state();
3604 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3605 }
3606 // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
3607 // COMPLETE / ERROR cascade follows `s.children` (in-partition by
3608 // union-find construction). The thunk acquires its own state lock
3609 // to queue the cascade.
3610 self.run_wave_for(node_id, |this| {
3611 let mut s = this.lock_state();
3612 this.terminate_node(&mut s, node_id, terminal);
3613 });
3614 }
3615
3616 /// Set the node's terminal slot, queue the wire message, and cascade to
3617 /// children. Idempotent on already-terminal node (no-op).
3618 ///
3619 /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
3620 /// drives the cascade so deep linear chains don't overflow the OS
3621 /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
3622 fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
3623 let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
3624 while let Some((id, t)) = work.pop() {
3625 if s.require_node(id).terminal.is_some() {
3626 continue; // Idempotent — already terminal.
3627 }
3628 // Take a refcount share for the terminal slot so the error
3629 // handle outlives the binding-side intern's transient share.
3630 if let TerminalKind::Error(h) = t {
3631 self.binding.retain_handle(h);
3632 }
3633 // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
3634 // terminating with no live subscribers, queue eager
3635 // `wipe_ctx` for the wave's lock-released drain. This is the
3636 // mutually-exclusive complement of the `Subscription::Drop`
3637 // wipe site: when the LAST sub drops first then terminate
3638 // fires, subs are empty here and we queue; when terminate
3639 // fires WITH subs still alive, we DON'T queue (subs not
3640 // empty), and `Subscription::Drop` will fire wipe directly
3641 // when those subs eventually drop. Either way, exactly one
3642 // wipe fires per terminal lifecycle.
3643 let queue_wipe = {
3644 let rec = s.require_node(id);
3645 rec.resubscribable && rec.subscribers.is_empty()
3646 };
3647 s.require_node_mut(id).terminal = Some(t);
3648 // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
3649 // and pending_wipes both live on per-thread WaveState. Single
3650 // borrow handles the queue-wipe push and the pending_fires
3651 // remove.
3652 crate::batch::with_wave_state(|ws| {
3653 if queue_wipe {
3654 ws.pending_wipes.push(id);
3655 }
3656 // Drain pending fires for this node — fn won't fire on a
3657 // terminal node.
3658 ws.pending_fires.remove(&id);
3659 });
3660 // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
3661 // when terminating (the canonical case is the R1.3.8.c overflow
3662 // ERROR synthesis path), drain the pause buffer and release
3663 // each payload's queue_notify-time retain. Without this, the
3664 // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
3665 // Subscribers receive the terminal directly via the cascade
3666 // below (tier-5 bypasses the pause buffer); the buffered
3667 // content is moot post-terminal.
3668 let drained: Vec<HandleId> = {
3669 let rec = s.require_node_mut(id);
3670 let mut drained: Vec<HandleId> = Vec::new();
3671 if rec.pause_state.is_paused() {
3672 // Take the buffered messages out, then collapse the
3673 // pause state to Active so subsequent code observes a
3674 // clean lifecycle. Idempotent on Active (no-op).
3675 let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
3676 if let PauseState::Paused { buffer, .. } = prev {
3677 drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
3678 }
3679 }
3680 // QA A4 (2026-05-07): drain replay buffer on terminate. A
3681 // non-resubscribable terminal ends the lifecycle; without
3682 // this drain the buffer's retains leak until `Drop for
3683 // CoreState`. Resubscribable nodes' replay buffers are
3684 // also drained (when they're hit by a terminal cascade);
3685 // a fresh subscribe rebuilds the buffer from scratch as
3686 // part of `reset_for_fresh_lifecycle`.
3687 drained.extend(rec.replay_buffer.drain(..));
3688 drained
3689 };
3690 for h in drained {
3691 self.binding.release_handle(h);
3692 }
3693 // Queue the wire message (tier 5 — bypasses pause buffer).
3694 let msg = match t {
3695 TerminalKind::Complete => Message::Complete,
3696 TerminalKind::Error(h) => Message::Error(h),
3697 };
3698 self.queue_notify(s, id, msg);
3699 // Cascade to children.
3700 let child_ids: Vec<NodeId> = s
3701 .children
3702 .get(&id)
3703 .map(|c| c.iter().copied().collect())
3704 .unwrap_or_default();
3705 for child_id in child_ids {
3706 let dep_idx = s.require_node(child_id).dep_index_of(id);
3707 let Some(idx) = dep_idx else { continue };
3708 // Mark this child's per-dep terminal slot. Take a retain on
3709 // the error handle for the slot share.
3710 {
3711 let child = s.require_node_mut(child_id);
3712 if child.dep_records[idx].terminal.is_some() {
3713 // Idempotent — child already saw this dep terminate.
3714 continue;
3715 }
3716 child.dep_records[idx].terminal = Some(t);
3717 }
3718 if let TerminalKind::Error(h) = t {
3719 self.binding.retain_handle(h);
3720 }
3721 // Auto-cascade gating: if all deps now terminal, push child
3722 // onto the work queue with the chosen terminal.
3723 //
3724 // Slice C-1: kinds that opt out of Lock 2.B (currently
3725 // `Operator(Reduce)`) intercept upstream COMPLETE so they
3726 // can emit their accumulator before terminating. Instead of
3727 // cascading, queue the child for fn-fire — `fire_operator`
3728 // sees `dep_records[0].terminal` set and emits the
3729 // appropriate batch (Data(acc) + Complete on COMPLETE,
3730 // Error(h) on ERROR).
3731 let action = {
3732 let child = s.require_node(child_id);
3733 if child.terminal.is_some() {
3734 ChildAction::None // Already terminated — no-op.
3735 } else if child.all_deps_terminal() {
3736 if child.skips_auto_cascade() {
3737 ChildAction::QueueFire
3738 } else {
3739 ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
3740 }
3741 } else {
3742 ChildAction::None
3743 }
3744 };
3745 match action {
3746 ChildAction::None => {}
3747 ChildAction::Cascade(t_child) => {
3748 work.push((child_id, t_child));
3749 }
3750 ChildAction::QueueFire => {
3751 // Q-beyond Sub-slice 2 (D108, 2026-05-09):
3752 // pending_fires lives on per-thread WaveState.
3753 crate::batch::with_wave_state(|ws| {
3754 ws.pending_fires.insert(child_id);
3755 });
3756 }
3757 }
3758 }
3759 }
3760 }
3761}
3762
3763/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
3764enum ChildAction {
3765 /// No cascade; child is already terminal or not yet all-deps-terminal.
3766 None,
3767 /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
3768 Cascade(TerminalKind),
3769 /// Queue child for fn-fire instead of cascading. Used by operator
3770 /// kinds that intercept upstream terminal (Operator(Reduce)).
3771 QueueFire,
3772}
3773
3774/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
3775/// ERROR seen wins. Caller has already verified all deps are terminal.
3776fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
3777 for dr in dep_records {
3778 if let Some(TerminalKind::Error(h)) = dr.terminal {
3779 return TerminalKind::Error(h);
3780 }
3781 }
3782 TerminalKind::Complete
3783}
3784
3785// -----------------------------------------------------------------------
3786// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
3787// -----------------------------------------------------------------------
3788
3789impl Core {
3790 /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
3791 ///
3792 /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
3793 /// terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3794 /// `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3795 /// bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3796 /// to async iterators and other consumers that wait on terminal
3797 /// delivery.
3798 /// - **Idempotent on duplicate delivery.** The per-node
3799 /// `has_received_teardown` flag is set on the first call; subsequent
3800 /// `teardown` calls (or cascade visits from other paths) are silent
3801 /// no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3802 /// - **Cascade downstream.** Each child is recursively torn down. The
3803 /// child's own COMPLETE auto-cascades from `terminate_node`'s logic
3804 /// (Lock 2.B); its TEARDOWN comes from this cascade.
3805 ///
3806 /// # Panics
3807 ///
3808 /// Panics if `node_id` is unknown.
3809 pub fn teardown(&self, node_id: NodeId) {
3810 {
3811 let s = self.lock_state();
3812 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3813 }
3814 let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3815 let torn_down_for_wave = torn_down.clone();
3816 // TEARDOWN cascade follows `s.children` AND `meta_companions`
3817 // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
3818 // Phase E `compute_touched_partitions(node_id)` (called by
3819 // `run_wave_for`) walks both axes so the wave acquires every
3820 // partition reachable via the cascade.
3821 self.run_wave_for(node_id, move |this| {
3822 let mut s = this.lock_state();
3823 let collected = this.teardown_inner(&mut s, node_id);
3824 torn_down_for_wave.lock().extend(collected);
3825 });
3826 // Fire NodeTornDown for every cascaded id (root + metas +
3827 // downstream consumers that auto-cascaded). Outside the state
3828 // lock, matching fire_topology_event discipline.
3829 let ids = std::mem::take(&mut *torn_down.lock());
3830 for id in ids {
3831 self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3832 }
3833 }
3834
3835 /// Iterative teardown walk (Slice A-bigger, M1-close).
3836 ///
3837 /// The recursive shape was:
3838 /// ```text
3839 /// teardown(n):
3840 /// if torn_down: return
3841 /// mark torn_down
3842 /// for meta in metas: teardown(meta)
3843 /// terminate_node + queue Teardown
3844 /// for child in children: teardown(child)
3845 /// ```
3846 /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3847 ///
3848 /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3849 /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3850 /// if already), then pushes (in reverse order so LIFO pops in forward
3851 /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3852 /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3853 /// self, then children" ordering is preserved by the push order:
3854 /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3855 /// pops and runs `terminate_node` + queue `Teardown`; then children
3856 /// pop. Idempotency via `has_received_teardown` keeps each node
3857 /// visited at most once even when multi-parent diamonds re-enter via
3858 /// a sibling path.
3859 fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3860 enum Action {
3861 Visit(NodeId),
3862 EmitTeardown(NodeId),
3863 }
3864 let mut stack: Vec<Action> = vec![Action::Visit(root)];
3865 // Topology accumulator: every node that actually emits TEARDOWN
3866 // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
3867 // for already-torn-down nodes short-circuit on idempotency).
3868 let mut torn_down: Vec<NodeId> = Vec::new();
3869 while let Some(action) = stack.pop() {
3870 match action {
3871 Action::Visit(id) => {
3872 if s.require_node(id).has_received_teardown {
3873 continue; // Idempotent (R2.6.4).
3874 }
3875 s.require_node_mut(id).has_received_teardown = true;
3876 // Push order: children first (pop LAST), then
3877 // EmitTeardown(id), then metas (pop FIRST). Reverse
3878 // each list so within-group order matches the original
3879 // recursive iteration.
3880 let children: Vec<NodeId> = s
3881 .children
3882 .get(&id)
3883 .map(|c| c.iter().copied().collect())
3884 .unwrap_or_default();
3885 for &child in children.iter().rev() {
3886 stack.push(Action::Visit(child));
3887 }
3888 stack.push(Action::EmitTeardown(id));
3889 let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
3890 for &meta in metas.iter().rev() {
3891 stack.push(Action::Visit(meta));
3892 }
3893 }
3894 Action::EmitTeardown(id) => {
3895 // Auto-prepend COMPLETE if not yet terminal. The (now
3896 // iterative) terminate_node handles auto-cascade to
3897 // children's own terminal slots per Lock 2.B.
3898 let already_terminal = s.require_node(id).terminal.is_some();
3899 if !already_terminal {
3900 self.terminate_node(s, id, TerminalKind::Complete);
3901 }
3902 // Wire emission of the TEARDOWN itself (tier 6).
3903 self.queue_notify(s, id, Message::Teardown);
3904 torn_down.push(id);
3905 }
3906 }
3907 }
3908 torn_down
3909 }
3910
3911 /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
3912 /// Meta companions are nodes whose lifecycle is bound to the parent's
3913 /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
3914 /// down first.
3915 ///
3916 /// Use this for inspection / audit / sidecar nodes that subscribe to
3917 /// parent state — without the ordering, the companion could observe
3918 /// the parent mid-destruction and emit garbage.
3919 ///
3920 /// Idempotent on duplicate registration of the same companion.
3921 ///
3922 /// # Lifecycle constraint
3923 ///
3924 /// Intended for **setup-time** wiring — call this before `parent` or
3925 /// `companion` enters a wave. Mid-wave registration (especially during
3926 /// a teardown cascade in flight) is implementation-defined: the new
3927 /// edge takes effect on the *next* wave. Adding a companion to a
3928 /// torn-down parent silently no-ops (the parent will not tear down
3929 /// again). For dynamic companion attachment with deterministic
3930 /// ordering, prefer constructing the wiring before subscribers exist.
3931 ///
3932 /// # Panics
3933 ///
3934 /// Panics if either node id is unknown, or if `parent == companion`
3935 /// (a node cannot be its own meta companion — would loop on TEARDOWN).
3936 pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
3937 assert!(parent != companion, "node cannot be its own meta companion");
3938 let mut s = self.lock_state();
3939 assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
3940 assert!(
3941 s.nodes.contains_key(&companion),
3942 "unknown companion {companion:?}"
3943 );
3944 let metas = &mut s.require_node_mut(parent).meta_companions;
3945 if !metas.contains(&companion) {
3946 metas.push(companion);
3947 }
3948 }
3949}
3950
3951// -----------------------------------------------------------------------
3952// INVALIDATE — cache clear + downstream cascade
3953// -----------------------------------------------------------------------
3954
3955impl Core {
3956 /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
3957 /// dependents per canonical spec §1.4.
3958 ///
3959 /// Semantics:
3960 /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
3961 /// the call is a no-op — no cache to clear, no INVALIDATE emitted.
3962 /// This naturally provides idempotency within a wave: once a node has
3963 /// been invalidated this wave (cache = NO_HANDLE), a second invalidate
3964 /// on the same node does nothing.
3965 /// - **Cache clear (immediate):** the node's cached handle is dropped
3966 /// (refcount released), `cache` becomes `NO_HANDLE`. State nodes
3967 /// keep `has_fired_once` per spec — INVALIDATE is not a re-gating
3968 /// event (the next emission to a previously-fired state still does
3969 /// not re-trigger the first-run gate; that's a resubscribable-terminal
3970 /// lifecycle concern, separate slice).
3971 /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
3972 /// normal pause-aware notify path. Buffers while paused, flushes
3973 /// immediately otherwise.
3974 /// - **Downstream cascade:** for each child of this node, the child's
3975 /// `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
3976 /// value referenced a now-released handle). The child is then
3977 /// recursively invalidated (no-op if its cache was already
3978 /// `NO_HANDLE`). This re-closes the child's first-run gate — fn
3979 /// won't fire again until the upstream re-emits a value.
3980 ///
3981 /// Wraps in a fresh wave when called from outside a wave, so
3982 /// notifications flush at the natural wave boundary.
3983 ///
3984 /// # Panics
3985 ///
3986 /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
3987 pub fn invalidate(&self, node_id: NodeId) {
3988 {
3989 let s = self.lock_state();
3990 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3991 }
3992 // INVALIDATE cascade follows `s.children` (in-partition by union-
3993 // find construction). Slice Y1 / Phase E.
3994 self.run_wave_for(node_id, |this| {
3995 let mut s = this.lock_state();
3996 this.invalidate_inner(&mut s, node_id);
3997 });
3998 }
3999
4000 /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4001 ///
4002 /// The recursive shape was a depth-first cache-clear walk:
4003 /// ```text
4004 /// invalidate(n):
4005 /// if cache(n) == NO_HANDLE: return // already-invalidated guard
4006 /// cache(n) = NO_HANDLE; release handle
4007 /// queue Invalidate(n)
4008 /// for child in children:
4009 /// child.dep_handles[idx] = NO_HANDLE
4010 /// invalidate(child)
4011 /// ```
4012 /// Deep linear chains overflowed the OS thread stack. The work-queue
4013 /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4014 /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4015 /// the never-populated / already-invalidated guard provides natural
4016 /// idempotency for diamond fan-in.
4017 fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
4018 let mut work: Vec<NodeId> = vec![root];
4019 while let Some(node_id) = work.pop() {
4020 // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4021 // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4022 // also does NOT fire — natural fallout of skipping via the
4023 // cache==NO_HANDLE guard (we never reach the queue-push below).
4024 let old_handle = s.require_node(node_id).cache;
4025 if old_handle == NO_HANDLE {
4026 continue;
4027 }
4028 // Clear cache + release the handle's slot ownership.
4029 s.require_node_mut(node_id).cache = NO_HANDLE;
4030 self.binding.release_handle(old_handle);
4031 // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4032 // queue OnInvalidate cleanup hook for lock-released drain at
4033 // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4034 // node firing even if a node re-populates mid-wave (via fn-fire
4035 // emit) and gets re-invalidated through a separate path. Pure
4036 // cache==NO_HANDLE idempotency (above) catches "still at
4037 // sentinel" only; the explicit set is the strict R1.3.9.b
4038 // reading.
4039 // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4040 // `invalidate_hooks_fired_this_wave` and
4041 // `deferred_cleanup_hooks` both live on per-thread WaveState.
4042 // Single borrow handles the dedup-insert and (on first
4043 // insertion) the cleanup-hook push.
4044 crate::batch::with_wave_state(|ws| {
4045 if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4046 ws.deferred_cleanup_hooks
4047 .push((node_id, CleanupTrigger::OnInvalidate));
4048 }
4049 });
4050 // Wire emission. Pause-aware via queue_notify.
4051 self.queue_notify(s, node_id, Message::Invalidate);
4052 // Cascade: for each child, clear the dep record's prev_data
4053 // referencing this node and push child onto the work queue.
4054 let child_ids: Vec<NodeId> = s
4055 .children
4056 .get(&node_id)
4057 .map(|c| c.iter().copied().collect())
4058 .unwrap_or_default();
4059 for child_id in child_ids {
4060 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4061 if let Some(idx) = dep_idx {
4062 // Reset the child's dep record — the handle was just
4063 // released. Subsequent first-run-gate checks see
4064 // sentinel and re-close.
4065 //
4066 // Snapshot prev_data + data_batch retains for deferred
4067 // release, then clear the record. Two-phase to satisfy
4068 // the borrow checker (nodes + deferred_handle_releases
4069 // are separate CoreState fields).
4070 let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4071 let dr = &s.require_node(child_id).dep_records[idx];
4072 (dr.prev_data, dr.data_batch.clone())
4073 };
4074 {
4075 // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4076 // deferred_handle_releases moved to per-thread
4077 // WaveState thread_local. State lock held; the
4078 // thread_local borrow is independent.
4079 crate::batch::with_wave_state(|ws| {
4080 if old_prev != NO_HANDLE {
4081 ws.deferred_handle_releases.push(old_prev);
4082 }
4083 for h in batch_hs {
4084 ws.deferred_handle_releases.push(h);
4085 }
4086 });
4087 }
4088 let dr = &mut s.require_node_mut(child_id).dep_records[idx];
4089 dr.prev_data = NO_HANDLE;
4090 dr.data_batch.clear();
4091 work.push(child_id);
4092 }
4093 }
4094 }
4095 }
4096}
4097
4098// -----------------------------------------------------------------------
4099// PAUSE / RESUME — multi-pauser lockset + replay buffer
4100// -----------------------------------------------------------------------
4101
4102/// Reported back from [`Core::resume`] when the final lock releases.
4103///
4104/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4105/// subscribers as part of the drain. `dropped` is the number of messages
4106/// that fell out the front of the buffer due to the Core-global
4107/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4108/// `dropped` indicates a controller held the lock long enough to overflow
4109/// the cap; the binding may want to surface a warning or error.
4110#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4111pub struct ResumeReport {
4112 pub replayed: u32,
4113 pub dropped: u32,
4114}
4115
4116impl Core {
4117 /// Acquire a pause lock on `node_id`. The first lock transitions the
4118 /// node from `Active` to `Paused`; further locks add to the lockset.
4119 /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4120 /// messages buffer in the node's pause buffer; other tiers flush
4121 /// immediately.
4122 ///
4123 /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4124 /// convention, R1.2.6 silent on the case).
4125 pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4126 let mut s = self.lock_state();
4127 let rec = s
4128 .nodes
4129 .get_mut(&node_id)
4130 .ok_or(PauseError::UnknownNode(node_id))?;
4131 // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4132 // this check, a stale pause-controller calling pause() on an
4133 // already-terminated node would re-arm `pause_state` to Paused.
4134 // The terminate_node path collapses pause_state → Active and
4135 // drains the buffer (A3-related), but doesn't gate subsequent
4136 // pause() calls. Treat as idempotent no-op (consistent with how
4137 // emit/complete/error early-return on terminal).
4138 if rec.terminal.is_some() {
4139 return Ok(());
4140 }
4141 // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4142 // dispatcher ignores PAUSE for this node — tier-3 flushes
4143 // immediately, fn fires immediately. Treat the call as a successful
4144 // no-op so callers don't need to special-case.
4145 if rec.pausable == PausableMode::Off {
4146 return Ok(());
4147 }
4148 rec.pause_state.add_lock(lock_id);
4149 Ok(())
4150 }
4151
4152 /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4153 /// node transitions back to `Active` and the buffered messages are
4154 /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4155 /// when the final lock released; `None` if the lockset is still
4156 /// non-empty (further locks held).
4157 ///
4158 /// Releasing an unknown `lock_id` (or releasing on an already-Active
4159 /// node) is an idempotent no-op returning `None`.
4160 pub fn resume(
4161 &self,
4162 node_id: NodeId,
4163 lock_id: LockId,
4164 ) -> Result<Option<ResumeReport>, PauseError> {
4165 // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4166 // sink Arcs. For default-mode nodes whose `pending_wave` was set
4167 // during pause, schedule a single fn-fire by adding to
4168 // `pending_fires` BEFORE we exit the lock — the wave engine picks
4169 // it up on the next drain tick.
4170 let (sinks, messages, dropped, pending_wave_for_default) = {
4171 let mut s = self.lock_state();
4172 let rec = s
4173 .nodes
4174 .get_mut(&node_id)
4175 .ok_or(PauseError::UnknownNode(node_id))?;
4176 // For Off mode, pause/resume are no-ops by construction.
4177 if rec.pausable == PausableMode::Off {
4178 return Ok(None);
4179 }
4180 let was_default_mode = rec.pausable == PausableMode::Default;
4181 // Capture pending_wave BEFORE remove_lock collapses the state.
4182 let pending_wave = if was_default_mode {
4183 rec.pause_state.take_pending_wave()
4184 } else {
4185 false
4186 };
4187 let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4188 // Not the final-resume — restore the pending_wave flag we
4189 // tentatively cleared, since we're not transitioning to
4190 // Active yet.
4191 if pending_wave {
4192 rec.pause_state.mark_pending_wave();
4193 }
4194 return Ok(None);
4195 };
4196 let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4197 let messages: Vec<Message> = buffer.into_iter().collect();
4198 // Default-mode pending-wave handling: schedule the fn-fire so
4199 // the wave engine consolidates the pause-window dep deliveries
4200 // into one fn execution. State nodes don't fire fn (no
4201 // `pending_fires` membership has effect for them).
4202 //
4203 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4204 // on per-thread WaveState.
4205 if pending_wave && was_default_mode {
4206 crate::batch::with_wave_state(|ws| {
4207 ws.pending_fires.insert(node_id);
4208 });
4209 }
4210 (sinks, messages, dropped, pending_wave && was_default_mode)
4211 };
4212 let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4213
4214 // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4215 // messages. Default-mode resume produces no buffered replay (the
4216 // consolidated fn-fire produces fresh wave traffic via the standard
4217 // commit_emission path).
4218 if !messages.is_empty() {
4219 for sink in &sinks {
4220 sink(&messages);
4221 }
4222 // Phase 3: balance the retain_handle calls done at buffer-push
4223 // time — sinks observe values but don't own refcount shares.
4224 for msg in &messages {
4225 if let Some(h) = msg.payload_handle() {
4226 self.binding.release_handle(h);
4227 }
4228 }
4229 }
4230
4231 // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4232 // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4233 // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4234 // drain pipeline; the new fn-fire emerges as a normal wave's worth
4235 // of messages to subscribers.
4236 if pending_wave_for_default {
4237 self.run_wave_for(node_id, |_this| {
4238 // The pending_fires entry was pushed in Phase 1 under the
4239 // lock. run_wave's drain picks it up.
4240 });
4241 }
4242 Ok(Some(ResumeReport { replayed, dropped }))
4243 }
4244
4245 /// True if the node currently holds at least one pause lock.
4246 #[must_use]
4247 pub fn is_paused(&self, node_id: NodeId) -> bool {
4248 self.state
4249 .lock()
4250 .require_node(node_id)
4251 .pause_state
4252 .is_paused()
4253 }
4254
4255 /// Number of pause locks currently held on `node_id`. `0` if Active.
4256 #[must_use]
4257 pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4258 self.state
4259 .lock()
4260 .require_node(node_id)
4261 .pause_state
4262 .lock_count()
4263 }
4264
4265 /// Test helper: whether `node_id` currently holds the given `lock_id`.
4266 #[must_use]
4267 pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4268 self.state
4269 .lock()
4270 .require_node(node_id)
4271 .pause_state
4272 .contains_lock(lock_id)
4273 }
4274}
4275
4276// -----------------------------------------------------------------------
4277// set_deps — atomic dep mutation
4278// -----------------------------------------------------------------------
4279
4280/// Errors returnable by [`Core::set_deps`].
4281///
4282/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4283/// Phase 13.8 Q1 lock:
4284/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4285/// explicit fixed-point semantics, which GraphReFly does not provide).
4286/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4287/// The `path` field reports the offending dep chain for debuggability.
4288/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4289/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4290/// terminal stream is a category error (terminal is one-shot at this
4291/// layer; recovery is the resubscribable path on a fresh subscribe).
4292/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4293/// Resubscribable terminal deps are accepted because the subscribe path
4294/// resets their lifecycle. Non-resubscribable terminal deps would deliver
4295/// their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4296/// which is rarely intended.
4297#[derive(Error, Debug, Clone, PartialEq)]
4298pub enum SetDepsError {
4299 /// `n` appeared in `new_deps` (self-loop rejection).
4300 #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4301 SelfDependency { n: NodeId },
4302
4303 /// Adding the new dep would create a cycle. `path` is the chain
4304 /// `[added_dep, ..., n]` reachable via existing deps.
4305 #[error(
4306 "set_deps({n:?}, ...): cycle would form via path {path:?} \
4307 (adding {added_dep:?} → {n:?} closes the loop)"
4308 )]
4309 WouldCreateCycle {
4310 n: NodeId,
4311 added_dep: NodeId,
4312 path: Vec<NodeId>,
4313 },
4314
4315 #[error("set_deps: unknown node {0:?}")]
4316 UnknownNode(NodeId),
4317
4318 #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4319 NotComputeNode(NodeId),
4320
4321 /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4322 /// is rejected — the stream has ended at this layer. To recover, mark
4323 /// the node resubscribable before terminate; a fresh subscribe will then
4324 /// reset its lifecycle.
4325 #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4326 TerminalNode { n: NodeId },
4327
4328 /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4329 /// Q1, this is rejected; resubscribable terminal deps are allowed
4330 /// because the subscribe path resets them when activated. Already-present
4331 /// terminal deps are unaffected (their terminal status was accepted at
4332 /// the time they terminated).
4333 #[error(
4334 "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4335 either mark it resubscribable before terminate, or remove the dep from new_deps"
4336 )]
4337 TerminalDep { n: NodeId, dep: NodeId },
4338
4339 /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4340 /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4341 /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4342 /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4343 /// callback returning a `tracked` set indexed against THAT ordering
4344 /// would corrupt indices if the rewire re-orders deps mid-fire.
4345 /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4346 ///
4347 /// Workaround: schedule the rewire from a different node's fn (via
4348 /// `Core::emit` on a state node and observing the emit downstream),
4349 /// or perform the rewire after the wave completes (e.g. from a sink
4350 /// callback that is itself outside any fn-fire scope).
4351 ///
4352 /// Slice F (2026-05-07) — A6.
4353 #[error(
4354 "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4355 (set_deps from inside the firing node's own fn would corrupt the \
4356 Dynamic `tracked` indices snapshot taken before invoke_fn). \
4357 Schedule the rewire outside this fire scope."
4358 )]
4359 ReentrantOnFiringNode { n: NodeId },
4360
4361 /// `set_deps(n, ...)` would trigger a partition migration (union or
4362 /// split in the per-subgraph union-find registry) that affects the
4363 /// partition of a node currently mid-fire on this thread. Distinct
4364 /// from [`Self::ReentrantOnFiringNode`]: that variant rejects
4365 /// `set_deps(n, ...)` where `n` itself is firing; this variant
4366 /// rejects `set_deps(n, ...)` on some OTHER node whose union/split
4367 /// shifts a firing node's partition root mid-wave.
4368 ///
4369 /// Why this matters: Y1's wave engine holds an
4370 /// [`Arc<crate::subgraph::SubgraphLockBox>`] for the firing node's
4371 /// partition for the wave's duration. A union mid-wave swaps the
4372 /// box-identity for one of the two affected partitions; a split
4373 /// (Y1+ post-Phase-F) extracts a fresh box for the orphan side.
4374 /// Either way the held Arc would diverge from the registry's
4375 /// current root for that partition, so the wave would lose
4376 /// serialization against the box's true partition mid-flight.
4377 ///
4378 /// Per [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
4379 /// Q3 = (a-strict): mid-wave migration is rejected at edge-mutation
4380 /// time. If a real consumer surfaces pressure to support mid-wave
4381 /// migration, lift via state-migration logic in a follow-up — but
4382 /// the v1 contract is "the partition a wave runs in cannot change
4383 /// shape mid-flight."
4384 ///
4385 /// `n` is the node whose `set_deps` was rejected; `firing` is the
4386 /// concretely-identified firing node whose partition would be
4387 /// migrated. Workaround: schedule the rewire outside the wave
4388 /// (e.g. emit a state-change that triggers `set_deps` from a sink
4389 /// callback running post-flush).
4390 ///
4391 /// Slice Y1 (D3 / D091, 2026-05-08).
4392 #[error(
4393 "set_deps({n:?}, ...): rejected — would migrate the partition of \
4394 currently-firing node {firing:?} mid-wave (union/split during \
4395 fire would invalidate the held wave_owner Arc). Schedule the \
4396 rewire outside the wave."
4397 )]
4398 PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
4399}
4400
4401impl Core {
4402 /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4403 /// cascading and without losing cache.
4404 ///
4405 /// Per the TLA+-verified design at
4406 /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4407 /// (35,950 distinct states, all 7 invariants clean):
4408 ///
4409 /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4410 /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4411 /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4412 /// - Status auto-settles if dirtyMask becomes empty.
4413 /// - Idempotent on `new_deps == current deps`.
4414 /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4415 /// - Cycles rejected (`WouldCreateCycle`).
4416 /// - Allowed mid-wave + while paused.
4417 /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4418 /// terminal non-resubscribable deps rejected (`TerminalDep`).
4419 ///
4420 /// The body is a single atomic dep-mutation transaction with several
4421 /// discrete validation stages. Splitting would require passing a
4422 /// partially-mutable CoreState across helpers, and the transaction's
4423 /// locality is what makes the F1 refcount-leak collection work.
4424 #[allow(clippy::too_many_lines)]
4425 pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4426 let mut s = self.lock_state();
4427 // Validate node exists and is compute. Read-once via the helper so
4428 // subsequent code can use `require_node(n)` without re-checking.
4429 let (is_state, is_producer, is_terminal) = {
4430 let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
4431 (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
4432 };
4433 if is_state || is_producer {
4434 // State and Producer nodes have no declared deps — set_deps
4435 // is meaningless. Producer nodes manage their own subscriptions
4436 // through the binding's ProducerCtx; mutating their (empty)
4437 // dep set would not affect that.
4438 return Err(SetDepsError::NotComputeNode(n));
4439 }
4440 // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
4441 // cannot be rewired; recovery is via resubscribable subscribe).
4442 if is_terminal {
4443 return Err(SetDepsError::TerminalNode { n });
4444 }
4445 // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
4446 // currently mid-fire on the wave-owner thread. Closes the D1 hazard
4447 // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
4448 // ordering and `Phase 3` would store the returned `tracked` indices
4449 // against post-rewire ordering. Same-thread re-entry is the only
4450 // path that matters — cross-thread emits already block on
4451 // `wave_owner` per the M1 design.
4452 // /qa F2 reverted (2026-05-10): currently_firing lives on
4453 // CoreState (per-Core, cross-thread visible). The D1 reentrance
4454 // check requires the same-thread visibility (a fn re-entering
4455 // set_deps on its own firing node), and the P13 cross-thread
4456 // check requires cross-thread visibility (Thread B's set_deps
4457 // observing Thread A's firing pushes during A's lock-released
4458 // invoke_fn). Per-Core placement on shared CoreState delivers
4459 // both. Read under the already-held state lock.
4460 if s.currently_firing.contains(&n) {
4461 return Err(SetDepsError::ReentrantOnFiringNode { n });
4462 }
4463 // Self-rewire rejection.
4464 if new_deps.contains(&n) {
4465 return Err(SetDepsError::SelfDependency { n });
4466 }
4467 // Validate all new deps exist.
4468 for &d in new_deps {
4469 if !s.nodes.contains_key(&d) {
4470 return Err(SetDepsError::UnknownNode(d));
4471 }
4472 }
4473 // Cycle detection: data flows parent → child via the `children` map.
4474 // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
4475 // `d` is already reachable from `n` via existing data-flow edges
4476 // (so `n → ... → d` exists, and the new `d → n` closes the loop).
4477 // DFS from `n` along `children` edges, looking for each added dep.
4478 let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
4479 let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
4480 let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
4481 for &d in &added {
4482 if let Some(path) = self.path_from_to(&s, n, d) {
4483 return Err(SetDepsError::WouldCreateCycle {
4484 n,
4485 added_dep: d,
4486 path,
4487 });
4488 }
4489 }
4490 // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
4491 // resubscribable. Resubscribable terminal deps are allowed — the
4492 // subscribe path resets their lifecycle when something activates
4493 // them. Already-present (kept) deps are unaffected; their terminal
4494 // status was accepted at the time they terminated.
4495 for &d in &added {
4496 let dep_rec = s.require_node(d);
4497 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
4498 return Err(SetDepsError::TerminalDep { n, dep: d });
4499 }
4500 }
4501 // Compute `removed` early (Phase F: needs to be available for P13
4502 // split-case widening below). Idempotent fast-path moved below the
4503 // P13 check accordingly.
4504 let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
4505
4506 // Slice Y1 (D3 / D091 — P13, 2026-05-08): reject mid-wave set_deps
4507 // that would shift a currently-firing node's partition root.
4508 // Distinct from `ReentrantOnFiringNode` (same-node case, line above).
4509 // Holds the registry lock briefly under the state lock per the
4510 // P12-fix lock-discipline invariant `state lock → registry mutex`.
4511 //
4512 // **Two cases:**
4513 // 1. **Union (Phase D)** — adding a cross-partition dep merges two
4514 // components. Both pre-merge components are affected (the
4515 // smaller-rank loser's box is dropped, its members migrate to
4516 // the winner's root).
4517 // 2. **Split (Phase F, 2026-05-09)** — removing an edge whose
4518 // removal disconnects the dep graph splits one component into
4519 // two. The pre-split component is affected (every member
4520 // re-unions; the orphan side gets a fresh `SubgraphLockBox`
4521 // while the keep side preserves the original Arc).
4522 //
4523 // Either case migrates the partition root (and box-identity) for
4524 // affected nodes mid-wave; if any node currently firing on this
4525 // thread is in an affected partition, the wave's held
4526 // `Arc<SubgraphLockBox>` would diverge from the registry's new
4527 // canonical box. Q3 = (a-strict) per the D3 design lock rejects
4528 // both cases at edge-mutation time.
4529 // /qa F2 reverted (2026-05-10): currently_firing lives on
4530 // CoreState (cross-thread visible — load-bearing for this P13
4531 // partition-migration check, which detects cross-thread set_deps
4532 // calls during another thread's lock-released invoke_fn).
4533 // Snapshot under the already-held state lock so the registry
4534 // mutex acquire below doesn't need to also hold the snapshot
4535 // borrow.
4536 let currently_firing_snapshot: Vec<NodeId> = s.currently_firing.clone();
4537 if !currently_firing_snapshot.is_empty() && (!added.is_empty() || !removed.is_empty()) {
4538 let mut reg = self.registry.lock();
4539 // Snapshot firing nodes' partitions. `partition_of` is mutating
4540 // (path compression) but partition IDENTITY is stable across
4541 // reads (only `union_nodes` / `split_partition` mutate roots).
4542 let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> =
4543 currently_firing_snapshot
4544 .iter()
4545 .filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
4546 .collect();
4547 if !firing_with_partition.is_empty() {
4548 let part_n = reg.partition_of(n);
4549 // Case 1 (union): for each added dep, check cross-partition merge.
4550 for &added_dep in &added {
4551 let part_added = reg.partition_of(added_dep);
4552 if part_n == part_added {
4553 continue; // same-partition add is a no-op in union-find
4554 }
4555 let affected = [part_n, part_added];
4556 if let Some(&(firing, _)) = firing_with_partition
4557 .iter()
4558 .find(|(_, p)| affected.contains(&Some(*p)))
4559 {
4560 return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4561 }
4562 }
4563 // Case 2 (split): for each removed dep, simulate undirected
4564 // walk from `removed_dep` skipping the would-be-removed edge
4565 // (`removed_dep → n`); if `n` is unreachable, removal would
4566 // disconnect — split — affecting all nodes in that
4567 // partition. Since dep edges are within a single partition
4568 // by construction (union-find merges on edge add), every
4569 // node currently in the partition is affected.
4570 //
4571 // QA-fix #4 (2026-05-09): pass `added_edges` as `extra_edges`
4572 // so a `set_deps` that simultaneously REMOVES one edge AND
4573 // ADDS another path isn't falsely rejected. Without this,
4574 // the pre-mutation walk doesn't see the would-be-added
4575 // edges and reports disconnect even when the net change
4576 // preserves connectivity.
4577 let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
4578 for &removed_dep in &removed {
4579 let part_removed = reg.partition_of(removed_dep);
4580 let visited = walk_undirected_dep_graph(
4581 &s,
4582 removed_dep,
4583 Some((removed_dep, n)),
4584 &added_edges,
4585 );
4586 let would_disconnect = !visited.contains(&n);
4587 if would_disconnect {
4588 if let Some(&(firing, _)) = firing_with_partition
4589 .iter()
4590 .find(|(_, p)| Some(*p) == part_removed)
4591 {
4592 return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4593 }
4594 }
4595 }
4596 }
4597 }
4598 // Idempotent fast-path. Now safe to short-circuit since the P13
4599 // check above already considered both `added` and `removed`.
4600 if added.is_empty() && removed.is_empty() {
4601 return Ok(());
4602 }
4603
4604 // Snapshot old deps (ordered) for topology event, before mutation.
4605 let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
4606
4607 // Carry out the rewire atomically.
4608 // 1. Build new dep_records, preserving DepRecord state for kept deps.
4609 let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
4610 //
4611 // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
4612 // slot owns a refcount share retained at `terminate_node` time. When a
4613 // dep is REMOVED, its slot is dropped — the corresponding handle's
4614 // share must be released here, otherwise it leaks until Core drop.
4615 // Also release data_batch retains for removed deps.
4616 let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
4617 let rec = s.require_node(n);
4618 // Index old dep_records by NodeId for O(1) lookup of kept deps.
4619 let old_by_node: HashMap<NodeId, &DepRecord> =
4620 rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
4621 let new_records: Vec<DepRecord> = new_deps_vec
4622 .iter()
4623 .map(|&d| {
4624 if let Some(old) = old_by_node.get(&d) {
4625 // Kept dep: preserve all state (prev_data, data_batch,
4626 // terminal, wave flags). Subscriptions stay live.
4627 DepRecord {
4628 node: d,
4629 prev_data: old.prev_data,
4630 dirty: old.dirty,
4631 involved_this_wave: old.involved_this_wave,
4632 data_batch: old.data_batch.clone(),
4633 terminal: old.terminal,
4634 }
4635 } else {
4636 // Added dep: fresh sentinel record.
4637 DepRecord::new(d)
4638 }
4639 })
4640 .collect();
4641 // Collect handles to release from REMOVED dep records.
4642 let mut to_release: Vec<HandleId> = Vec::new();
4643 for d in &removed {
4644 if let Some(old) = old_by_node.get(d) {
4645 if let Some(TerminalKind::Error(h)) = old.terminal {
4646 to_release.push(h);
4647 }
4648 // Release data_batch retains for removed deps.
4649 for &h in &old.data_batch {
4650 to_release.push(h);
4651 }
4652 }
4653 }
4654 (new_records, to_release)
4655 };
4656 // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
4657 // currently model a per-dep dirtyMask explicitly (we use the boolean
4658 // `dirty` flag at node level). Removing a dep's entry from the implicit
4659 // mask is therefore implicit — by removing the dep, future emissions
4660 // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
4661 // stays wave-scoped and gets cleared at wave end. The setDeps action
4662 // itself does NOT change the dirty boolean unless all deps are cleared;
4663 // in that case we settle.
4664 // Slice E2 (D067): on a dynamic node that had previously fired its
4665 // fn, capture `has_fired_once` BEFORE the reset so we can fire
4666 // `OnRerun` cleanup lock-released after `drop(s)` below. Without
4667 // this, the next `fire_regular` Phase 1 would capture
4668 // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
4669 // silently dropping the prior activation's cleanup closure when
4670 // the next `invoke_fn` overwrites `current_cleanup`. Per spec
4671 // R2.4.5, `set_deps` does NOT end the activation cycle
4672 // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
4673 // fire on every re-fire including post-set_deps.
4674 let fire_set_deps_on_rerun;
4675 {
4676 let rec = s.require_node_mut(n);
4677 fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
4678 rec.dep_records = new_dep_records;
4679 // Re-derive `tracked` for static derived: all indices.
4680 // For dynamic: clear `tracked` AND reset `has_fired_once` so the
4681 // next dep delivery satisfies the first-fire branch in
4682 // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
4683 // Without resetting `has_fired_once`, the cleared `tracked` blocks
4684 // every future fire — fn never re-runs and the dynamic node sits
4685 // on stale cache derived from the old dep set. The next fire
4686 // re-runs fn unconditionally; fn's returned `tracked` then
4687 // repopulates `rec.tracked` and normal selective-deps semantics
4688 // resume from the next dep update onward.
4689 if rec.is_dynamic {
4690 rec.tracked.clear();
4691 rec.has_fired_once = false;
4692 } else {
4693 // Derived (static) and Operator track all deps.
4694 rec.tracked = (0..new_deps_vec.len()).collect();
4695 }
4696 }
4697
4698 // 2. Update inverted-edge map (children).
4699 for &removed_dep in &removed {
4700 if let Some(set) = s.children.get_mut(&removed_dep) {
4701 set.remove(&n);
4702 }
4703 }
4704 for &added_dep in &added {
4705 s.children.entry(added_dep).or_default().insert(n);
4706 }
4707
4708 // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
4709 // wave so any downstream propagation runs cleanly. We capture only
4710 // the LIST of added deps (not their cache values) because the cache
4711 // can change between releasing the validation lock and the wave's
4712 // re-acquisition — see the P2 race fix below.
4713 //
4714 // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
4715 // closure re-acquiring the lock, a concurrent thread could
4716 // invalidate one of the added deps, releasing its cache handle. A
4717 // pre-snapshot of `(added_dep, cache)` pairs would then carry a
4718 // dangling HandleId into `deliver_data_to_consumer`. The fix is to
4719 // re-read each added dep's `cache` INSIDE the closure (under the
4720 // freshly re-acquired state lock). The wave-owner re-entrant mutex
4721 // (Q2) blocks concurrent waves once we enter `run_wave`, so the
4722 // re-read sees a coherent post-validation state.
4723 let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
4724 // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
4725 // membership BEFORE dropping the state lock so the registry can
4726 // never lag behind topology mutations as observed by concurrent
4727 // readers. Lock-order invariant `state lock → registry mutex`
4728 // (one-way; never registry → state) — see the matching block in
4729 // `Core::register` for the full rationale.
4730 // - For each new edge: union the partitions of `n` and `added_dep`.
4731 // - For each removed edge (Slice Y1 / Phase F, 2026-05-09):
4732 // run undirected-dep-graph BFS from `removed_dep` over the
4733 // POST-removal `s.children` + `dep_records`. If `n` is
4734 // unreachable, the partition has split — gather the affected
4735 // component nodes + intra-component edges, then call
4736 // [`SubgraphRegistry::split_partition`] to migrate the orphan
4737 // side onto a fresh `SubgraphLockBox`. Mid-fire splits would
4738 // have been rejected at the P13 check above (Q3 = (a-strict)).
4739 {
4740 let mut reg = self.registry.lock();
4741 for &added_dep in &added {
4742 reg.union_nodes(n, added_dep);
4743 }
4744 for &removed_dep in &removed {
4745 // Post-removal walk — `s.children[removed_dep]` no longer
4746 // contains `n`, and `s.nodes[n].dep_records` no longer
4747 // contains `removed_dep`. No skip needed; no extra edges
4748 // (added edges are already applied to `s.children` and
4749 // `dep_records` by the time we reach this block).
4750 let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
4751 if visited.contains(&n) {
4752 // Still connected via other dep edges — no split.
4753 continue;
4754 }
4755 // Disconnected. `visited` is the keep-side (containing
4756 // `removed_dep`). Identify the original component, the
4757 // intra-component dep edges, and split.
4758 let original_root = reg.find(removed_dep);
4759 // Snapshot keys before iterating — `find` mutates via
4760 // path compression; iterating + mutating concurrently
4761 // would alias-borrow.
4762 let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
4763 let component_nodes: Vec<NodeId> = snapshot_keys
4764 .into_iter()
4765 .filter(|&node| reg.find(node) == original_root)
4766 .collect();
4767 let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
4768 // Collect dep edges within the component (post-removal).
4769 // Edge convention: `(parent, child)` data-flow direction.
4770 let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
4771 for &node in &component_nodes {
4772 if let Some(rec) = s.nodes.get(&node) {
4773 for d in rec.dep_records.iter().map(|r| r.node) {
4774 if component_set.contains(&d) {
4775 edges_in_component.push((d, node));
4776 }
4777 }
4778 }
4779 }
4780 let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
4781 reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
4782 // Marker call kept for symmetry with `union_nodes` — the
4783 // registry's `on_edge_removed` is itself a no-op (Phase F
4784 // moved the actual work into Core where the dep-graph
4785 // view is available).
4786 reg.on_edge_removed(n, removed_dep);
4787 }
4788 }
4789 // Drop the state lock before run_wave (which acquires its own) and
4790 // before crossing the binding boundary for the F1 refcount-fix
4791 // releases. Keeps the lock-discipline split (binding calls outside
4792 // the state lock) consistent with the rest of the dispatcher.
4793 drop(s);
4794 // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
4795 // that had previously fired. The cleanup closure cleans up
4796 // resources tied to the old dep shape before the next fn-fire
4797 // (triggered by added-dep push-on-subscribe below) registers a
4798 // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
4799 // because set_deps may NOT enter a wave (no added deps → no
4800 // run_wave below) — queueing the hook would orphan it until the
4801 // next unrelated wave drains.
4802 if fire_set_deps_on_rerun {
4803 self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
4804 }
4805 // Fire topology event after lock is dropped.
4806 self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
4807 node: n,
4808 old_deps: old_deps_vec,
4809 new_deps: new_deps_vec.clone(),
4810 });
4811 if !added_for_wave.is_empty() {
4812 // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
4813 // touched partitions. Added deps are now unioned with `n`
4814 // (Phase C P12 fix moved registry mutation inside the state
4815 // lock), so any cascade through them stays in `n`'s partition
4816 // set as walked by `compute_touched_partitions`.
4817 self.run_wave_for(n, |this| {
4818 let mut s = this.lock_state();
4819 // Defensive: re-validate `n` still exists and isn't terminal.
4820 // A concurrent path could have terminated it between
4821 // validation and run_wave_for's partition-lock acquisition.
4822 if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
4823 return;
4824 }
4825 for added_dep in &added_for_wave {
4826 // Re-read cache under the wave-owner-held lock — this
4827 // is the post-validation, post-concurrent-action
4828 // snapshot. NO_HANDLE means the dep was invalidated
4829 // concurrently; skip (no data to push).
4830 let cache = match s.nodes.get(added_dep) {
4831 Some(rec) => rec.cache,
4832 None => continue, // dep deleted concurrently
4833 };
4834 if cache == NO_HANDLE {
4835 continue;
4836 }
4837 let dep_idx = s.require_node(n).dep_index_of(*added_dep);
4838 if let Some(idx) = dep_idx {
4839 this.deliver_data_to_consumer(&mut s, n, idx, cache);
4840 }
4841 }
4842 });
4843 }
4844 for h in removed_handles {
4845 self.binding.release_handle(h);
4846 }
4847 Ok(())
4848 }
4849
4850 /// DFS from `from` along data-flow edges (children map) looking for `to`.
4851 /// Returns the path including endpoints, or `None` if unreachable. Used
4852 /// for cycle detection in [`Self::set_deps`].
4853 fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
4854 if from == to {
4855 return Some(vec![from]);
4856 }
4857 let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
4858 let mut visited: HashSet<NodeId> = HashSet::new();
4859 while let Some((cur, path)) = stack.pop() {
4860 if !visited.insert(cur) {
4861 continue;
4862 }
4863 if cur == to {
4864 return Some(path);
4865 }
4866 if let Some(children) = s.children.get(&cur) {
4867 for &child in children {
4868 let mut new_path = path.clone();
4869 new_path.push(child);
4870 stack.push((child, new_path));
4871 }
4872 }
4873 }
4874 None
4875 }
4876}
4877
4878// CoreState helpers — kept on the inner struct so they're naturally scoped
4879// to the lock guard.
4880impl CoreState {
4881 fn alloc_node_id(&mut self) -> NodeId {
4882 let id = NodeId::new(self.next_node_id);
4883 self.next_node_id += 1;
4884 id
4885 }
4886
4887 fn alloc_sub_id(&mut self) -> SubscriptionId {
4888 let id = SubscriptionId(self.next_subscription_id);
4889 self.next_subscription_id += 1;
4890 id
4891 }
4892
4893 /// Clear wave-scoped flags and rotate per-dep batch data on every
4894 /// node. Run at the end of every wave (regular drain via `run_wave`,
4895 /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
4896 /// drain). Centralized so a future wave-state field can't be missed
4897 /// at one of the cleanup sites.
4898 ///
4899 /// Per-dep rotation (R2.9.b / R1.3.6.b):
4900 /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
4901 /// The last batch entry's retain transfers to `prev_data`; the old
4902 /// `prev_data`'s retain is released. All earlier batch entries are
4903 /// released.
4904 /// - `data_batch` cleared.
4905 /// - Per-dep `dirty` and `involved_this_wave` cleared.
4906 ///
4907 /// Handle releases are pushed to `deferred_handle_releases` for
4908 /// post-lock-drop release by the caller.
4909 pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
4910 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
4911 // + `pending_pause_overflow` clears moved to
4912 // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
4913 // rotation below pushes batch-handle and prev_data releases into
4914 // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
4915 // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
4916 // borrows the WaveState thread_local; no lock-discipline rule
4917 // applies (state lock + thread_local borrow are independent).
4918 //
4919 // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4920 // `invalidate_hooks_fired_this_wave` clear moved to
4921 // [`crate::batch::WaveState::clear_wave_state`]. The
4922 // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
4923 // explicitly on success/panic paths) likewise moves with the
4924 // field.
4925 //
4926 // /qa F2 reverted (2026-05-10): `currently_firing` stays on
4927 // CoreState (per-Core, cross-thread visible — load-bearing for
4928 // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
4929 // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
4930 // a future code path that bypasses the guard would otherwise
4931 // leak a stale entry into the next wave).
4932 self.currently_firing.clear();
4933 //
4934 // Slice G tier3 emit tracking moved to per-partition state (Q3,
4935 // 2026-05-09); cleared by [`super::WaveOwnerGuard::drop`] on
4936 // outermost release for each partition the wave touched.
4937 for rec in self.nodes.values_mut() {
4938 rec.dirty = false;
4939 rec.involved_this_wave = false;
4940 for dr in &mut rec.dep_records {
4941 let batch_len = dr.data_batch.len();
4942 if batch_len > 0 {
4943 // Release all batch entries EXCEPT the last — the last
4944 // entry's retain transfers to prev_data.
4945 for &h in &dr.data_batch[..batch_len - 1] {
4946 ws.deferred_handle_releases.push(h);
4947 }
4948 // Release the OLD prev_data (its retain was from the
4949 // previous wave's rotation or from initial delivery).
4950 if dr.prev_data != NO_HANDLE {
4951 ws.deferred_handle_releases.push(dr.prev_data);
4952 }
4953 // Rotate: last batch entry becomes new prev_data.
4954 // Its retain carries over — no extra retain needed.
4955 dr.prev_data = dr.data_batch[batch_len - 1];
4956 dr.data_batch.clear();
4957 }
4958 dr.dirty = false;
4959 dr.involved_this_wave = false;
4960 }
4961 }
4962 }
4963
4964 pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
4965 self.nodes
4966 .get(&id)
4967 .unwrap_or_else(|| panic!("unknown node {id:?}"))
4968 }
4969
4970 pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
4971 self.nodes
4972 .get_mut(&id)
4973 .unwrap_or_else(|| panic!("unknown node {id:?}"))
4974 }
4975}
4976
4977/// Release every binding-side refcount share owned by this `CoreState`
4978/// when the last `Core` clone drops the inner Mutex.
4979///
4980/// Without this, every retained handle in `cache` / `terminal` Error /
4981/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
4982/// registry until process exit. Production bindings (napi-rs, pyo3,
4983/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
4984/// this cleanup.
4985///
4986/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
4987/// is the only call, and a panicking binding during cleanup would already
4988/// have been a problem in normal operation.
4989impl Drop for CoreState {
4990 fn drop(&mut self) {
4991 // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
4992 // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
4993 // drop naturally with the per-thread WaveState's lifetime; no
4994 // CoreState-side cleanup needed.
4995 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
4996 // and `wave_cache_snapshots` moved to per-thread WaveState
4997 // thread_local. By outermost-BatchGuard-drop discipline both fields
4998 // are empty by the time CoreState drops (BatchGuard owns a Core
4999 // clone, so Core can't drop while a BatchGuard is in flight). Any
5000 // thread that ran a wave on this Core drained on its own outermost
5001 // BatchGuard; cross-Core thread_local sharing is fine because each
5002 // wave drains its own retains.
5003 //
5004 // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5005 // `pending_notify` likewise moved to per-thread WaveState. The
5006 // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5007 // payload_handle) is no longer reachable from `Drop for CoreState`:
5008 // by invariant, no wave is in flight when CoreState drops (BatchGuard
5009 // holds a Core clone), so the originating thread's WaveState
5010 // pending_notify is empty by then. Other threads' WaveStates are
5011 // unreachable from CoreState::drop anyway — they're per-thread
5012 // thread_locals scoped to whichever thread ran the wave. The
5013 // outermost `BatchGuard::drop` is the canonical drain point on both
5014 // success and panic paths; Drop for CoreState relies on that
5015 // discipline holding rather than re-implementing it.
5016
5017 // Per-node retained handles:
5018 // - `cache` (1 retain per non-NO_HANDLE state cache or
5019 // populated compute cache).
5020 // - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5021 // - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5022 // terminated-dep slot).
5023 // - `pause_state` paused buffer messages with payload handles
5024 // (1 retain per buffered Data/Error).
5025 for rec in self.nodes.values_mut() {
5026 if rec.cache != NO_HANDLE {
5027 self.binding.release_handle(rec.cache);
5028 }
5029 if let Some(TerminalKind::Error(h)) = rec.terminal {
5030 self.binding.release_handle(h);
5031 }
5032 for dr in &rec.dep_records {
5033 if let Some(TerminalKind::Error(h)) = dr.terminal {
5034 self.binding.release_handle(h);
5035 }
5036 // Release data_batch retains (in-flight wave data).
5037 for &h in &dr.data_batch {
5038 self.binding.release_handle(h);
5039 }
5040 // Release prev_data retain (cross-wave persistence).
5041 if dr.prev_data != NO_HANDLE {
5042 self.binding.release_handle(dr.prev_data);
5043 }
5044 }
5045 if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5046 for msg in buffer {
5047 if let Some(h) = msg.payload_handle() {
5048 self.binding.release_handle(h);
5049 }
5050 }
5051 }
5052 // Slice E1: release replay-buffer retains.
5053 for &h in &rec.replay_buffer {
5054 self.binding.release_handle(h);
5055 }
5056 // Operator scratch (Slice C-3, D026): generic per-operator
5057 // state struct. Each variant's release_handles releases the
5058 // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5059 // Last latest + default; Take/Skip/TakeWhile own no handles).
5060 if let Some(scratch) = rec.op_scratch.as_mut() {
5061 scratch.release_handles(&*self.binding);
5062 }
5063 }
5064
5065 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5066 // retain-holding fields (`wave_cache_snapshots`,
5067 // `deferred_handle_releases`, `pending_notify`) are drained by
5068 // outermost BatchGuard::drop (success + panic paths). See
5069 // comment above for the invariant.
5070 }
5071}