graphrefly_core/batch.rs
1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//! runs `op` lock-released, then drains all transitive fn-fires and
13//! flushes per-subscriber notifications. Each fn-fire iteration drops
14//! the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//! can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//! the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//! queueing + child propagation. `&self`-only; bracket-fires
20//! `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//! pause-buffer routing. Snapshots the subscriber list at first-touch-
23//! per-wave so late subscribers (installed mid-wave between drain
24//! iterations) don't receive duplicate deliveries from messages already
25//! queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//! the consumer for fn-fire if its tracked-deps set is satisfied.
28//! Called from `commit_emission`, plus `activate_derived` and
29//! `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//! discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//! user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//! `invalidate` / `complete` / `error` / `teardown` and run a nested
38//! wave (the existing `in_tick` re-entrance gate composes
39//! transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//! check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//! It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//! `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//! tier-split. Re-entrance from a handshake sink callback panics with
46//! the [`reentrance_guard`] diagnostic.
47
48use std::cell::{Cell, RefCell};
49use std::collections::HashMap;
50use std::rc::Rc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = the owner thread (S4, D246/D248/D249).** `Core` is
66// single-owner `!Send + !Sync`: every emit in a wave runs on the one
67// owner thread, and a wave is one uninterrupted owner-side drain
68// bounded above by the outermost `BatchGuard` drop. A per-thread
69// `AHashSet<NodeId>` is therefore the natural placement for "has node
70// X already emitted a tier-3 message in this wave?" — its lifetime
71// matches the wave's, with no cross-thread or cross-wave contamination
72// (there is no other thread driving this Core; the deleted
73// `wave_owner` `ReentrantMutex` / cross-thread-BLOCK model is gone
74// with S2c's §7 machinery).
75//
76// **History:** placed per-partition on `SubgraphLockBox::state` (Q3
77// v1), moved to a per-thread thread-local by the D1 patch
78// (2026-05-09) to survive mid-wave cross-thread `set_deps` partition
79// splits — a hazard that only existed under the now-deleted
80// shared-Core cross-thread model. Post-S4 the thread-local placement
81// is simply the owner thread's wave-scoped set.
82//
83// **Lifecycle:** populated by `Core::commit_emission` /
84// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
85// `BatchGuard` drop on the owner thread (both success and
86// panic-discard paths). Re-entrant nested waves on the same Core+
87// thread share the set — inner-wave emits add to the same set; the
88// outermost drop is the canonical clear point.
89thread_local! {
90 static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
91}
92
93// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
94//
95// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
96// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
97// identical to thread_local borrow_mut. The "mutex hop is slow" intuition
98// is wrong UNCONTENDED.
99// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
100// than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
101// cache-line bouncing on the lock state itself.
102// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
103// dominated by cache-line bouncing across cores, NOT by single-thread
104// mutex acquire overhead. Moving the four wave-scoped fields to a
105// per-thread thread_local eliminates the bounce point entirely.
106//
107// **Wave scope = the owner thread (S4, D246/D248/D249):** `Core` is
108// single-owner `!Send + !Sync` — exactly one owner thread drives a
109// given `Core`, and a wave is **one uninterrupted owner-side drain**
110// (the deleted cross-thread `wave_owner` `ReentrantMutex` / "cross-thread
111// emits BLOCK" model is gone with S2c's §7 machinery). There is no
112// cross-thread interleave to defend against; the only cross-`Core`
113// concurrency is host-native via *independent* per-worker Cores
114// (actor model), each with its own `WAVE_STATE`.
115//
116// **Lifecycle:** populated by `Core::commit_emission` /
117// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
118// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
119// releases any retained handles still in `wave_cache_snapshots` /
120// `deferred_handle_releases`. Defensive wave-start clear at outermost
121// owning BatchGuard entry guards against cargo's thread-reuse propagating
122// stale entries from a prior panicked-mid-wave test.
123thread_local! {
124 static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
125}
126
127// Wave-ownership flag for the at-most-one Core this OS thread may own.
128// Stores the active `Core::generation` (nonzero ⇒ owning a wave on that
129// Core; `0` ⇒ no active Core). Membership-of-generation means "this
130// thread is currently inside an OWNING wave on that Core" — i.e. the
131// outermost `BatchGuard` whose drop must run the drain. Replaces the
132// former Core-global `CoreState::in_tick` bool.
133//
134// **Why a single-generation `Cell<u64>` (D252, S5, 2026-05-19).** Post-
135// D247/D248 `Core` is single-owner `!Send + !Sync`; one Core per OS
136// thread is the model. The earlier `AHashSet<u64>` keyed by
137// `Core::generation` defended against an owner thread holding a wave on
138// Core-A and *also* entering a wave on Core-B from a `DeferQueue`
139// closure (/qa F1, the cross-Core owner-side nesting case D047 fixed).
140// Under D248 single-owner that defense is theoretical — no in-tree call
141// path produces it (would require an owner-side `DeferFn` to capture &
142// drive a *second* `&Core`; the `Send` half of the seam is closed under
143// `Core: !Send`). Per D252 (user-locked 2026-05-19), the hashing +
144// allocation cost is replaced by a single `Cell<u64>` (1 word, no
145// alloc) and the "one Core per OS thread" model is locked in as a
146// **hard invariant** — `BatchGuard::claim_in_tick` panics fail-loud if
147// it observes a nonzero generation that doesn't match `self.generation`,
148// structurally rejecting cross-Core same-thread nesting rather than
149// relying on convention. Nested same-Core re-entry (/qa EC#3, LIVE) is
150// preserved: the matching `claim` returns `false` (slot already holds
151// our generation) so the nested guard's drop no-ops and the outer wave
152// drains. `0` is reserved as the sentinel and `Core::generation` is
153// `NonZeroU64` (the existing process-monotonic counter starts at 1) so
154// the sentinel cannot collide with a live Core.
155//
156// **No lock required.** `Cell` is `!Sync` and only the one owner thread
157// touches it — single-owner `!Sync` Core ⇒ no cross-thread reader.
158//
159// Stale slot: the owning `BatchGuard::drop` clears the cell on every
160// exit path — normal return, the closure-body-panic branch, AND the
161// drain-phase-panic `catch_unwind` arm (before `resume_unwind`). So
162// a slot can only be left stuck if `Drop` itself never runs:
163// `std::mem::forget(guard)` or a process abort without unwinding — both
164// out of contract (`BatchGuard` is `#[must_use]` + `!Send`). A stale
165// nonzero slot would trip the D252 panic-on-mismatch on the NEXT Core's
166// claim on this thread — surfaced loudly, not silently masked.
167//
168// History: this flag lived briefly per-thread (Q-beyond sub-slice 3),
169// was reverted to Core-global (/qa F1+F2), keyed per-(Core, thread) for
170// the deleted disjoint-cross-thread model (D047, 2026-05-15), and at S4
171// (D246/D248/D249) the disjoint-partition constraint was retired with
172// the §7 cross-thread machinery. D252/S5 (2026-05-19) collapses the
173// `AHashSet<u64>` to `Cell<u64>` and locks "one Core per OS thread" as
174// a hard invariant — reversing D047's per-(Core, thread) keying since
175// the cross-Core owner-side nesting case it defended is no in-tree path
176// under single-owner Core. See `docs/rust-port-decisions.md` D252 +
177// D047/D246 and `docs/migration-status.md` § "D246 S5".
178thread_local! {
179 static IN_TICK_OWNED: Cell<u64> = const { Cell::new(0) };
180}
181
182/// Wave-scoped state previously held under [`Core::cross_partition`]'s
183/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
184/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
185/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
186/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
187/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
188///
189/// All fields are populated and drained within one wave on the one
190/// owner thread. Cross-thread access is structurally impossible —
191/// `Core` is single-owner `!Send + !Sync` (S4/D248); there is no
192/// other thread driving this Core.
193///
194/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
195/// `deferred_handle_releases`, and `pending_notify` hold binding-side
196/// handle retains. They MUST be drained (and released through
197/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
198/// on success and panic paths. `pending_notify` holds one retain per
199/// payload-bearing message (one per `Message::payload_handle()`); the
200/// retains are taken in `Core::queue_notify` and balanced either by
201/// `flush_notifications` (success path: pushed into
202/// `deferred_handle_releases`) or directly in the panic-discard path of
203/// `BatchGuard::drop` (taken from `pending_notify` and released).
204///
205/// The thread_local has no `Drop` hook with access to a binding — a
206/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
207/// would leak retains until the thread exits OR the next outermost
208/// wave-start clear runs (which for safety we don't fire — clearing
209/// without releasing would double-leak by losing the retain). The
210/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
211/// clears `pending_auto_resolve` + `pending_pause_overflow` +
212/// `pending_fires` (no retains) + `currently_firing` +
213/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
214/// retain-holding fields — those must be empty by construction at
215/// outermost wave start (a prior wave's panic-discard path drained them,
216/// or a prior wave's success path drained them).
217pub(crate) struct WaveState {
218 /// Payload-handle releases owed for messages that landed in
219 /// `pending_notify` during this wave (one per `payload_handle()`).
220 /// `BatchGuard::drop` releases these after sinks fire and the lock
221 /// is dropped, balancing the retain done in `queue_notify`.
222 pub(crate) deferred_handle_releases: Vec<HandleId>,
223 /// Pre-wave cache snapshots used to restore state if the wave aborts
224 /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
225 /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
226 /// the wave started writing to it. The snapshotted handle holds a
227 /// retain (taken when the snapshot was inserted) so it stays alive
228 /// for restoration. On wave success, snapshots are drained and their
229 /// retains released. On wave abort, each cache slot is restored from
230 /// the snapshot and the original retain transfers to the cache slot.
231 pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
232 /// D291: nodes whose `rec.terminal` slot transitioned `None → Some(_)`
233 /// during this wave (via [`Core::terminate_node`]). The set is the
234 /// snapshot — no payload because the idempotent guard in `terminate_node`
235 /// (`if rec.terminal.is_some() { continue; }`) means the only valid
236 /// transition is `None → Some`, so restore is unconditionally
237 /// `rec.terminal = None`. ERROR-tier `TerminalKind::Error(h)` retains
238 /// are released lock-released on restore (matches the
239 /// `wave_cache_snapshots` discipline — slot owns the retain, restore
240 /// transfers ownership to the releases vec).
241 ///
242 /// Closes the R4.3.2 status-snapshot completeness gap for terminal
243 /// tiers (cross-track-ledger §1 D282 row, D290 follow-on "Case 5"):
244 /// pre-D291 a `ctx.down(src, [COMPLETE])` inside `batch()` + throw
245 /// would clear the wave's pending DATA/COMPLETE messages via
246 /// [`Self::discard_wave_cleanup`] but leave `rec.terminal = Some(_)`,
247 /// silently rejecting any post-rollback emit on the same node
248 /// (substrate's terminal-state guard).
249 ///
250 /// **Refcount discipline:** the set itself holds no handles. On
251 /// restore, the pre-transition `rec.terminal` slot's owned handle
252 /// (`Some(TerminalKind::Error(h))` only — `Complete` has no payload)
253 /// is taken out of the slot and pushed into the releases vec; the
254 /// caller releases it after dropping the state lock.
255 pub(crate) wave_terminal_snapshots: AHashSet<NodeId>,
256 /// D291: per-dep terminal slots that transitioned `None → Some(_)`
257 /// during this wave. Mirrors [`Self::wave_terminal_snapshots`] for
258 /// the cascade-to-children case in [`Core::terminate_node`]
259 /// (`child.dep_records[idx].terminal = Some(t)`, same idempotent
260 /// guard at the dep-record level).
261 ///
262 /// **D291 /qa D2 (2026-05-25): keyed on `(child_id, dep_node_id)`**,
263 /// NOT `(child_id, dep_idx)`. A mid-batch `Core::set_deps(child_id,
264 /// …)` regenerates `dep_records` and invalidates positional
265 /// indexes; re-keying on the dep's `NodeId` lets restore re-resolve
266 /// the (possibly-new) index via `child.dep_index_of(dep_node_id)`.
267 /// Pre-fix a set_deps-mid-batch + cascade-rollback would silently
268 /// zero the WRONG dep's terminal slot. Restore sets the slot back
269 /// to `None`; ERROR-tier handles taken from the slot are released
270 /// lock-released.
271 pub(crate) wave_dep_terminal_snapshots: AHashSet<(NodeId, NodeId)>,
272 /// Nodes that need an auto-Resolved at wave end if they don't receive
273 /// a tier-3+ message from their own commit_emission. Populated by
274 /// the RESOLVED child propagation in `commit_emission`. Drained by
275 /// the auto-resolve sweep in `drain_and_flush`.
276 pub(crate) pending_auto_resolve: AHashSet<NodeId>,
277 /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
278 /// [`Core::queue_notify`] when the pause buffer first overflows;
279 /// drained at wave-end after the lock-released call to
280 /// `BindingBoundary::synthesize_pause_overflow_error`.
281 ///
282 /// # Panic-discard trade-off (deliberate; D280 doc-lock, 2026-05-22)
283 ///
284 /// Spec R1.3.8.c specifies the ERROR-synthesis contract for the
285 /// success path; it is silent on panic-discard semantics. The
286 /// [`Core::drain_and_flush`] success path drains this queue and
287 /// fires synthesis. The [`BatchGuard::drop`] panic-discard path
288 /// clears the queue WITHOUT firing synthesis (via
289 /// [`Self::clear_wave_state`] below), so a queued overflow ERROR
290 /// diagnostic for a wave that also panicked is silently dropped.
291 /// The consumer-visible `ResumeReport.dropped` count IS preserved
292 /// (lives on `PauseState`, not here); only the synthesized ERROR
293 /// with `{ nodeId, droppedCount, configuredMax, lockHeldDurationMs }`
294 /// is lost for that specific wave.
295 ///
296 /// This is the load-bearing [`BatchGuard`] atomicity invariant —
297 /// "the wave didn't happen" — applied uniformly to every
298 /// retain-holding / refcount-discipline field across the panic
299 /// path. Adding an asymmetric ERROR-surfacing exception here
300 /// would weaken atomicity for every other invariant that relies
301 /// on it (cache restoration, dep-mask reset,
302 /// `pending_auto_resolve` clear, etc.). The fn panic itself is
303 /// also louder than the missing diagnostic — a process-level
304 /// signal (panic hook → stderr) survives the wave-discard.
305 ///
306 /// # Lift point (future consumer pressure only)
307 ///
308 /// If a consumer surfaces a real need for overflow diagnostics
309 /// across panic boundaries, the correct shape is a
310 /// **panic-survivable diagnostic side channel** (e.g., a
311 /// Core-level `on_panic_diagnostic` hook or a binding-layer
312 /// callback that the panic-discard path invokes BEFORE clearing
313 /// retain-holding state) — NOT bolting a surviving-ERROR
314 /// exception onto [`BatchGuard::drop`]'s atomicity contract. Mint
315 /// as a separate D-number under the D196 consumer-pressure gate
316 /// when the scenario materializes.
317 pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
318 /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
319 ///
320 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
321 /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
322 /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
323 /// child-cascade `QueueFire` branch, `activate_derived`'s producer
324 /// queueing, `resume`'s pending-wave consolidation, and operator
325 /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
326 /// `fire_regular` / `fire_operator` (each removes the firing node
327 /// before invoking).
328 pub(crate) pending_fires: AHashSet<NodeId>,
329 /// Per-node outgoing message buffer; flushed at wave end. Insertion-
330 /// ordered so flush order is deterministic — load-bearing for
331 /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
332 /// companion both have queued messages in the same wave, the meta
333 /// (queued first via `teardown_inner`'s recursion order) flushes
334 /// first.
335 ///
336 /// Each entry carries the per-wave subscriber snapshot taken at first
337 /// touch (Slice A close, M1: lock-released drain). Late subscribers
338 /// installed mid-wave between fn-fire iterations don't appear in
339 /// already-snapshotted entries; this is the load-bearing fix that
340 /// prevents duplicate-Data delivery when a handshake delivers the
341 /// post-commit cache and the wave's flush would otherwise also fire
342 /// to the same sink.
343 ///
344 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
345 /// `CoreState::pending_notify` to per-thread `WaveState`. The map
346 /// holds a payload-handle retain per payload-bearing message
347 /// (`Message::payload_handle()`); these MUST be released by the
348 /// outermost `BatchGuard::drop` (success path: through
349 /// `flush_notifications` → `deferred_handle_releases`; panic path:
350 /// directly in `BatchGuard::drop`'s panic branch).
351 pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
352 /// D217-AMEND-2 (2026-05-16): persistent spare for `pending_notify`,
353 /// ping-ponged with it at wave end so a fresh `IndexMap::default()`
354 /// (a new `ahash::RandomState` via `gen_hasher_seed`/`from_keys`
355 /// PLUS RawVec realloc churn on the next wave's `queue_notify`) is
356 /// NEVER constructed after thread init. The empirical attribution
357 /// (`examples/profile_st_emit.rs` + macOS `sample`) put the old
358 /// per-wave `mem::take(&mut pending_notify)` at ~1250 of ~4767
359 /// hot-path samples — the dominant §7 floor tax (D217 lever-1
360 /// "slab store" falsified; the node store is minor). Holds NO
361 /// retains between waves: it is always empty (cleared, capacity +
362 /// hasher retained) outside `flush_notifications`.
363 pub(crate) pending_notify_recycle: IndexMap<NodeId, PendingPerNode>,
364 // Q-beyond Sub-slice 3 (D108, 2026-05-09) moved `in_tick` and
365 // `currently_firing` from `CoreState` to per-thread `WaveState`;
366 // /qa F1+F2 (2026-05-10) reverted both to `CoreState`; the in_tick
367 // placement was finalized 2026-05-15 (D047) and then collapsed by
368 // D252 (S5, 2026-05-19) — see below. The two fields have *different*
369 // scope requirements:
370 //
371 // - **`in_tick` — one-Core-per-OS-thread `Cell<u64>` (D252).** Pure
372 // thread-local once broke cross-Core isolation (Core-A's flag
373 // leaked to Core-B on the same OS thread, /qa F1); pure Core-
374 // global broke the now-deleted disjoint-partition cross-thread
375 // drain. The intermediate D047 `AHashSet<u64>` keyed by
376 // `Core::generation` defended both. Under post-D248 single-owner
377 // Core the cross-Core owner-side nesting case has no in-tree
378 // consumer (would require an owner-side `DeferFn` to drive a
379 // *second* `&Core`, structurally absent), so D252 collapses the
380 // set to a single `Cell<u64>` slot per OS thread and panics
381 // fail-loud on cross-Core nesting. Same-Core nested re-entry
382 // (/qa EC#3) is preserved by the matching-generation branch in
383 // `BatchGuard::claim_in_tick`. NOT a `CoreState` field.
384 //
385 // - **`currently_firing` — Core-global (stays on `CoreState`).**
386 // Per-thread placement silently bypassed the cross-thread P13
387 // partition-migration check in `Core::set_deps`: thread B's set_deps
388 // must observe thread A's firing pushes. Per-Core (cross-thread
389 // visible) placement restores the D091 safety check (/qa F2).
390 //
391 // The other 11 wave-scoped fields stay per-thread because they're
392 // accessed only by the one owner thread (single-owner `!Send` Core,
393 // S4/D248 — no cross-thread emitter exists).
394 /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
395 /// for `OnInvalidate` cleanup hook firing. A node already in this
396 /// set this wave has already had its `OnInvalidate` queued into
397 /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
398 /// `invalidate_inner` re-encounters it.
399 ///
400 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
401 /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
402 /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
403 /// cleared by `WaveState::clear_wave_state`.
404 pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
405 /// Deferred sink-fire jobs collected by `flush_notifications`.
406 /// `flush_notifications` populates this from `pending_notify`;
407 /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
408 /// each entry lock-released. Each tuple is
409 /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
410 /// waves.
411 ///
412 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
413 /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
414 /// retains held — the `Vec<Sink>` clones own Arcs that drop
415 /// naturally; the `Vec<Message>` payload retains were already moved
416 /// into `deferred_handle_releases` by `flush_notifications`.
417 pub(crate) deferred_flush_jobs: DeferredJobs,
418 /// Slice E2 (per D060/D061): lock-released drain queue for
419 /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
420 /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
421 /// drained after the lock drops at wave boundary by
422 /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
423 /// D060). Panic-discarded silently per D061.
424 ///
425 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
426 /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
427 pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
428 /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
429 /// `BindingBoundary::wipe_ctx` calls fired eagerly from
430 /// `Core::terminate_node` when a resubscribable node terminates with
431 /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
432 /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
433 /// silently.
434 ///
435 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
436 /// `CoreState::pending_wipes` to per-thread `WaveState`.
437 pub(crate) pending_wipes: Vec<NodeId>,
438}
439
440impl WaveState {
441 fn new() -> Self {
442 Self {
443 deferred_handle_releases: Vec::new(),
444 wave_cache_snapshots: HashMap::new(),
445 // D291: empty by construction at outermost wave start; both
446 // success + panic-discard paths drain.
447 wave_terminal_snapshots: AHashSet::new(),
448 wave_dep_terminal_snapshots: AHashSet::new(),
449 pending_auto_resolve: AHashSet::new(),
450 pending_pause_overflow: Vec::new(),
451 pending_fires: AHashSet::new(),
452 pending_notify: IndexMap::new(),
453 // D217-AMEND-2: one `IndexMap::default()` for the thread's
454 // life — its ahash seed + capacity are recycled forever.
455 pending_notify_recycle: IndexMap::new(),
456 invalidate_hooks_fired_this_wave: AHashSet::new(),
457 deferred_flush_jobs: Vec::new(),
458 deferred_cleanup_hooks: Vec::new(),
459 pending_wipes: Vec::new(),
460 }
461 }
462
463 /// Wave-end clear of the non-retain-holding fields. Called from
464 /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
465 /// (`wave_cache_snapshots`, `deferred_handle_releases`,
466 /// `pending_notify`) are NOT cleared here — they follow the
467 /// success/panic paths' explicit drain discipline in
468 /// `BatchGuard::drop`.
469 pub(crate) fn clear_wave_state(&mut self) {
470 self.pending_auto_resolve.clear();
471 // pending_pause_overflow is normally drained by drain_and_flush
472 // via the synthesis loop. If a wave is panic-discarded BEFORE
473 // synthesis runs, BatchGuard::drop's panic path also clears it
474 // explicitly. Pre-wave defensive clear in
475 // `begin_batch_with_guards` makes this idempotent.
476 //
477 // D280 (2026-05-22): the deliberate trade-off — atomicity beats
478 // diagnostic on panic — is documented on the field declaration
479 // at `pending_pause_overflow`; lift-point (panic-survivable
480 // diagnostic side channel) gated on D196 consumer pressure.
481 self.pending_pause_overflow.clear();
482 // Sub-slice 2: pending_fires is intentionally NOT cleared
483 // here. Two reasons:
484 // 1. Wave-success drain empties it by construction: every
485 // `pick_next_fire` selection is removed by
486 // `fire_regular` / `fire_operator` before invocation,
487 // and `drain_and_flush` only exits when the set is empty.
488 // 2. The `Core::resume` default-mode consolidated-fire
489 // pattern stages an entry OUTSIDE any in-tick wave and
490 // then enters a new wave to drain it; clearing here
491 // would erase that pre-staged entry. The panic-discard
492 // path in `BatchGuard::drop` clears it explicitly.
493
494 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
495 // CoreState::currently_firing — defensive clear there.
496 // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
497 // wave-scoped — cleared so the next wave can fire cleanups
498 // again.
499 self.invalidate_hooks_fired_this_wave.clear();
500 // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
501 // `pending_wipes` are intentionally NOT cleared here. They
502 // follow the same discipline as `deferred_handle_releases` /
503 // `pending_notify`:
504 // - SUCCESS path (`BatchGuard::drop` non-panic): drained by
505 // `Core::drain_deferred` AFTER `clear_wave_state` runs,
506 // then fired lock-released by `Core::fire_deferred`.
507 // - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
508 // `std::mem::take`-and-dropped AFTER `clear_wave_state`
509 // runs (silently per D061 / D069).
510 // Clearing here would race the success path: queued sink fires
511 // / cleanup hooks / wipes would be erased BEFORE
512 // `drain_deferred` could take them.
513 }
514}
515
516/// Run a closure with mutable access to this thread's [`WaveState`].
517///
518/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
519/// for sites that touch ONE field. For sites that interleave state lock
520/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
521/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
522/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
523/// pattern).
524///
525/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
526/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
527/// on nested borrow. The same discipline that the prior
528/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
529/// holding cross_partition) carries over.
530pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
531 WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
532}
533
534/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
535/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
536/// outermost owning entry. Mirrors the pre-existing tier3 defensive
537/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
538/// propagating stale entries from a prior panicked-mid-wave test.
539///
540/// The retain-holding fields (`wave_cache_snapshots` /
541/// `deferred_handle_releases`) MUST already be empty by construction at
542/// outermost wave entry — outermost `BatchGuard::drop` always drains
543/// them on both success and panic paths. If they're non-empty here it
544/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
545/// the next BatchGuard's outermost drop will eventually drain them.
546fn wave_state_clear_outermost() {
547 with_wave_state(|ws| {
548 // /qa F4 (2026-05-10): debug_assert that retain-holding fields
549 // are empty at outermost wave start. The invariant claim is
550 // "outermost BatchGuard::drop drains them on both success and
551 // panic paths, so they're empty before the next wave starts."
552 // If a panic path EVER bypasses the drain (today: not reachable
553 // because BatchGuard::drop is robust against panicking sinks via
554 // catch_unwind), this assert catches it in tests immediately
555 // rather than letting stale entries leak into the next wave's
556 // drain (which would release Core-A's HandleIds via Core-B's
557 // binding under cross-Core same-thread sequential use).
558 debug_assert!(
559 ws.wave_cache_snapshots.is_empty(),
560 "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
561 outermost wave start ({} entries) — prior BatchGuard::drop \
562 bypassed the drain (would leak retains into next wave's \
563 binding). See /qa F4 (2026-05-10).",
564 ws.wave_cache_snapshots.len()
565 );
566 // D291: same invariant for the terminal-slot snapshots. The set
567 // holds no handles directly, but its drain triggers ERROR-handle
568 // releases via `restore_wave_terminal_snapshots`; stale entries
569 // would either silently restore `rec.terminal = None` on an
570 // unrelated next wave (corrupting state) or skip a real release
571 // (leaking refcounts). Outermost `BatchGuard::drop` drains both
572 // on success (via [`Core::drain_wave_terminal_snapshots`]) and
573 // panic (via [`Core::restore_wave_terminal_snapshots`] from
574 // [`Self::discard_wave_cleanup`]).
575 debug_assert!(
576 ws.wave_terminal_snapshots.is_empty(),
577 "wave_state_clear_outermost: wave_terminal_snapshots non-empty \
578 at outermost wave start ({} entries) — prior BatchGuard::drop \
579 bypassed the drain (would corrupt next wave's terminal slots). \
580 See D291.",
581 ws.wave_terminal_snapshots.len()
582 );
583 debug_assert!(
584 ws.wave_dep_terminal_snapshots.is_empty(),
585 "wave_state_clear_outermost: wave_dep_terminal_snapshots non-empty \
586 at outermost wave start ({} entries). See D291.",
587 ws.wave_dep_terminal_snapshots.len()
588 );
589 debug_assert!(
590 ws.deferred_handle_releases.is_empty(),
591 "wave_state_clear_outermost: deferred_handle_releases non-empty \
592 at outermost wave start ({} entries) — prior BatchGuard::drop \
593 bypassed the drain. See /qa F4 (2026-05-10).",
594 ws.deferred_handle_releases.len()
595 );
596 debug_assert!(
597 ws.pending_notify.is_empty(),
598 "wave_state_clear_outermost: pending_notify non-empty at \
599 outermost wave start ({} entries) — prior BatchGuard::drop \
600 bypassed the drain. See /qa F4 (2026-05-10).",
601 ws.pending_notify.len()
602 );
603 // D217-AMEND-2 / QA: enforce the invariant the field's own doc
604 // claims ("always empty outside `flush_notifications`"). The
605 // recycle slot is cleared at the end of every `flush_notifications`
606 // and drained on the panic-discard path (`discard_wave_cleanup`);
607 // a non-empty slot here means a panic bypassed both — surface it
608 // loudly in tests rather than silently injecting a prior wave's
609 // stale entries into the next wave's `mem::swap`.
610 debug_assert!(
611 ws.pending_notify_recycle.is_empty(),
612 "wave_state_clear_outermost: pending_notify_recycle non-empty \
613 at outermost wave start ({} entries) — a panic bypassed both \
614 flush_notifications' clear AND discard_wave_cleanup's drain. \
615 See D217-AMEND-2 / QA (2026-05-16).",
616 ws.pending_notify_recycle.len()
617 );
618 ws.pending_auto_resolve.clear();
619 ws.pending_pause_overflow.clear();
620 // Sub-slice 2: pending_fires is intentionally NOT cleared here.
621 // Pre-Sub-slice-2 it lived on CoreState and survived between
622 // waves; load-bearing for `Core::resume`'s default-mode
623 // consolidated-fire pattern, which inserts into pending_fires
624 // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
625 // false at that moment) and then calls `run_wave_for(node_id)`
626 // — `run_wave_for` enters a NEW outermost wave whose drain must
627 // pick up that pre-staged pending_fires entry. Clearing here
628 // would erase it.
629 //
630 // pending_fires holds no retains, so a stale entry from a
631 // prior panicked-mid-wave test that bypassed BatchGuard::drop
632 // would leak as a spurious fire on the next wave on the same
633 // thread (no refcount damage). The panic-discard path in
634 // BatchGuard::drop and the wave-success drain together
635 // guarantee pending_fires is empty by wave end; relying on
636 // that invariant matches the pre-refactor lifecycle.
637 //
638 // Intentionally NOT clearing wave_cache_snapshots /
639 // deferred_handle_releases / pending_notify here — those hold
640 // retains and need a binding to release. Documented invariant:
641 // they're empty by outermost wave start.
642
643 // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
644 // defensively clear the OnInvalidate dedup set on outermost-wave
645 // entry. Holds no retains; a stale entry from a prior
646 // panicked-mid-wave test that bypassed BatchGuard::drop would
647 // only suppress the OnInvalidate cleanup hook for that node on
648 // the next wave (no refcount damage). Clearing matches the
649 // tier3 defensive-clear precedent.
650 //
651 // `currently_firing` was reverted to CoreState (per /qa F2 — the
652 // per-thread placement silently bypassed the cross-thread P13
653 // partition-migration check); its defensive clear lives in
654 // `CoreState::clear_wave_state` (which BatchGuard::drop runs
655 // wave-end on both success and panic paths).
656 ws.invalidate_hooks_fired_this_wave.clear();
657 // Intentionally NOT clearing deferred_flush_jobs /
658 // deferred_cleanup_hooks / pending_wipes here — by invariant
659 // they're empty at outermost wave start (drained on success
660 // by drain_deferred → fire_deferred; drained on panic by
661 // BatchGuard::drop's panic branch). Pre-clearing would race a
662 // hypothetical wave that staged into them OUTSIDE in_tick
663 // (none does today, but matching the deferred_handle_releases
664 // / pending_notify discipline keeps the invariant uniform).
665 });
666}
667
668// §7 (D208–D211): the per-thread `PARTITION_CACHE` is DELETED. It existed
669// to amortize the union-find `compute_touched_partitions` BFS + registry
670// epoch round-trips across repeated same-seed emits. With static
671// user-declared scheduling groups there is no epoch and the
672// touched-group walk resolves group `Arc`s under a single uncontended
673// `group_locks` lock (and is entirely skipped for the all-`None` floor),
674// so the cache (and its ABA-avoidance generation keying) is unnecessary.
675
676/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
677/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
678/// wave-scope rationale.
679fn tier3_check(node: NodeId) -> bool {
680 TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
681}
682
683/// Mark `node` as having emitted a tier-3 message in the current wave on
684/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
685fn tier3_mark(node: NodeId) {
686 TIER3_EMITTED_THIS_WAVE.with(|s| {
687 s.borrow_mut().insert(node);
688 });
689}
690
691/// Wave-end clear of the per-thread tier3 tracker. Called from the
692/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
693/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
694/// invoke this — the outer wave is still in flight and inner-wave marks
695/// are part of the outer wave's Slice G coalescing state.
696fn tier3_clear() {
697 TIER3_EMITTED_THIS_WAVE.with(|s| {
698 s.borrow_mut().clear();
699 });
700}
701
702/// Deferred sink-fire jobs collected during `flush_notifications`. Each
703/// entry pairs a snapshot of the sink Arcs to fire with the messages to
704/// deliver to them — one entry per (node × phase) cell with non-empty
705/// content. Drained from `CoreState` and fired lock-released.
706pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
707
708/// Lock-released drain payload of the wave's BatchGuard:
709/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
710/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
711/// Sliced into a type alias to satisfy `clippy::type_complexity`.
712pub(crate) type WaveDeferred = (
713 DeferredJobs,
714 Vec<HandleId>,
715 Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
716 Vec<crate::handle::NodeId>,
717);
718
719/// One subscriber-snapshot epoch within a node's wave-end notification
720/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
721/// for the node in a wave, and a fresh batch is opened whenever the node's
722/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
723/// existing sink unsubscribes, or a handshake-time panic evicts an
724/// orphaned sink). All messages within one batch flush to the same sink
725/// list — the snapshot taken when the batch opened, frozen against
726/// subsequent revision bumps.
727pub(crate) struct PendingBatch {
728 /// `NodeRecord::subscribers_revision` value at the moment this batch
729 /// opened. Used by `queue_notify` to decide append-to-last-batch vs
730 /// open-fresh-batch on every push.
731 pub(crate) snapshot_revision: u64,
732 /// Subscriber snapshot frozen at batch-open time. SmallVec<[_; 1]>
733 /// inlines the common single-subscriber case (avoids heap alloc for
734 /// the dominant 1-sink-per-node pattern in most reactive graphs).
735 pub(crate) sinks: SmallVec<[Sink; 1]>,
736 /// Messages queued to this batch. SmallVec<[_; 3]> inlines the
737 /// common per-node-per-wave message set (DIRTY + DATA + optional
738 /// RESOLVED) without heap allocation.
739 pub(crate) messages: SmallVec<[Message; 3]>,
740}
741
742/// Per-node wave-end notification queue, structured as one or more
743/// subscriber-snapshot epochs (`batches`). The common case (no
744/// mid-wave subscribe / unsubscribe at this node) keeps a single
745/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
746///
747/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
748/// `(sinks, messages)` pair per node — the snapshot froze on first
749/// `queue_notify` and was reused for every subsequent emit to the same
750/// node in the wave. That caused the documented late-subscriber +
751/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
752/// between two emits to the same node was invisible to the second
753/// emit's flush slice. The revision-tracked batch list resolves it —
754/// late subs land in a fresh batch that frozenly carries them, while
755/// pre-subscribe batches retain their original snapshot so the new
756/// sub doesn't double-receive earlier emits via flush AND handshake.
757pub(crate) struct PendingPerNode {
758 pub(crate) batches: SmallVec<[PendingBatch; 1]>,
759}
760
761impl PendingPerNode {
762 /// Iterate every queued message for this node across all batches in
763 /// arrival order. Used by R1.3.3.a invariant assertions and the
764 /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
765 /// reason about wave-content per node, not per batch.
766 pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
767 self.batches.iter().flat_map(|b| b.messages.iter())
768 }
769
770 /// Mutable counterpart for `iter_messages`. Used by
771 /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
772 /// entries to Data when a wave detects a multi-emit case after the
773 /// fact.
774 pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
775 self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
776 }
777}
778
779/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
780///
781/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
782/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
783/// `set_deps(N, ...)` from inside N's own fn-fire with
784/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
785/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
786/// a different dep ordering than Phase-3's `tracked` storage.
787///
788/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
789/// callbacks like `project_each` / `predicate_each`). Drop fires even
790/// on panic, so the stack stays balanced under user-fn unwinds.
791///
792/// Membership semantics (NOT strict LIFO): the only consumer of
793/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
794/// `contains(&n)` — a set-membership test. Drop pops the right-most
795/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
796/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
797/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
798/// physical order of the remaining As may not match construction order,
799/// but membership is preserved. If a future call site needs strict LIFO
800/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
801/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
802// D221 (F-b) floor-hardening, 2026-05-17: holds `&'a Core`, NOT an
803// owned `core.clone()` (Core is no longer `Clone`). D246/S2c: the
804// `group_locks`/`global_wave` Arcs are deleted; the lock-free
805// single-owner `RefCell` floor pays zero per-fn-fire Arc tax. Both
806// construction sites
807// (`fire_regular` batch.rs:~1148, `fire_operator` batch.rs:~1719) are
808// locals in a `&self` method, so the `self: &Core` borrow strictly
809// outlives the guard — the lifetime is sound by construction (no
810// escape, dropped within the same method). Removing the clone here also
811// stops S2's `LockedCell` deletion from being able to reintroduce the
812// tax.
813pub(crate) struct FiringGuard<'a> {
814 core: &'a Core,
815 node_id: NodeId,
816}
817
818impl<'a> FiringGuard<'a> {
819 pub(crate) fn new(core: &'a Core, node_id: NodeId) -> Self {
820 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
821 // CoreState (cross-thread visible, restoring the D091 P13 check).
822 // Push under the state lock scope.
823 {
824 let mut s = core.lock_state();
825 s.shared.currently_firing.push(node_id);
826 }
827 Self { core, node_id }
828 }
829}
830
831impl Drop for FiringGuard<'_> {
832 fn drop(&mut self) {
833 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
834 // CoreState. Pop under state lock.
835 {
836 let mut s = self.core.lock_state();
837 // Pop the right-most matching node_id (membership semantics —
838 // not strict LIFO). If absent, an external rebalance already
839 // popped — silent no-op (panic-in-Drop is poison).
840 if let Some(pos) = s
841 .shared
842 .currently_firing
843 .iter()
844 .rposition(|n| *n == self.node_id)
845 {
846 s.shared.currently_firing.swap_remove(pos);
847 }
848 }
849 }
850}
851
852/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
853/// uninitialized or the contained type doesn't match `T` — both are
854/// invariant violations for any `fire_op_*` helper that should only be
855/// called from `fire_operator`'s match arm for the matching variant.
856fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
857 s.require_node(node_id)
858 .op_scratch
859 .as_ref()
860 .expect("op_scratch slot uninitialized for operator node")
861 .as_any_ref()
862 .downcast_ref::<T>()
863 .expect("op_scratch type mismatch")
864}
865
866/// Mutable borrow of the per-operator scratch slot. Same invariants as
867/// [`scratch_ref`].
868fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
869 s.require_node_mut(node_id)
870 .op_scratch
871 .as_mut()
872 .expect("op_scratch slot uninitialized for operator node")
873 .as_any_mut()
874 .downcast_mut::<T>()
875 .expect("op_scratch type mismatch")
876}
877
878impl Core {
879 // -------------------------------------------------------------------
880 // Wave entry + drain
881 // -------------------------------------------------------------------
882
883 /// Wave entry. The caller passes a closure that performs the wave's
884 /// triggering operation (`commit_emission`, `terminate_node`, etc.).
885 /// The closure runs lock-released; closure-internal Core methods
886 /// acquire the state lock as they go.
887 ///
888 /// **Implementation:** delegates to [`Self::begin_batch`] for the
889 /// wave's RAII lifecycle. The returned `BatchGuard` claims `in_tick`
890 /// (`Core::generation`-keyed) and on drop runs the drain + flush +
891 /// sink-fire phases — OR, if the closure panicked, the
892 /// panic-discard path that restores cache snapshots and clears
893 /// in_tick. (S4/D248: the `wave_owner` re-entrant mutex is deleted —
894 /// single-owner `!Send` Core, one uninterrupted owner-side drain.)
895 /// This unification gives `run_wave` the same panic-safety
896 /// guarantee as the user-facing `Core::batch`.
897 ///
898 /// **Re-entrance:** a closure invoked from inside another wave — the
899 /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
900 /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
901 /// The outer wave's drain picks up the inner closure's queued work.
902 ///
903 /// **Lock-release discipline (Slice A close, M1):** all binding-side
904 /// callbacks except the subscribe-time handshake fire lock-released.
905 /// Sinks / user fns / custom-equals oracles that re-enter Core via
906 /// the owner-side mailbox/`DeferQueue` seam run a nested wave. The
907 /// one owner thread runs the in-flight drain to quiescence before
908 /// `emit` returns — preserving the user-facing "emit returning means
909 /// subscribers have observed" contract (no cross-thread lock needed;
910 /// there is no cross-thread emitter).
911 /// Wave entry with a known `seed` node. Acquires only the partitions
912 /// transitively touched from `seed` (downstream cascade via
913 /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
914 /// current partition. The canonical Y1 parallelism win for per-seed
915 /// entry points (`Core::emit`, `Core::subscribe`'s activation,
916 /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
917 /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
918 /// push-on-subscribe).
919 ///
920 /// S4/D248: single-owner `!Send + !Sync` Core — one uninterrupted
921 /// owner-side drain per wave; the deleted per-partition `wave_owner`
922 /// `ReentrantMutex` parallelism is replaced by host-native
923 /// concurrency across *independent per-worker Cores* (actor model).
924 /// The "emit returning means subscribers have observed" contract
925 /// holds because the one owner thread drains to quiescence before
926 /// returning.
927 ///
928 /// Slice Y1 / Phase E (2026-05-08).
929 pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
930 where
931 F: FnOnce(&Self),
932 {
933 let _guard = self.begin_batch_for(seed);
934 op(self);
935 }
936
937 /// Wave entry helper. D274 (2026-05-21): the
938 /// `Result<(), PartitionOrderViolation>` wrapper was deleted — groups
939 /// are static identity only post-D248/D253, single-owner per Core per
940 /// D246, one Core per OS thread per D252; partition-order violations
941 /// cannot fire.
942 pub(crate) fn try_run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
943 where
944 F: FnOnce(&Self),
945 {
946 let _guard = self.begin_batch_for(seed);
947 op(self);
948 }
949
950 /// Drain retains held by `wave_cache_snapshots` and return them so
951 /// the caller can release them lock-released. Called from the
952 /// wave-success path in [`BatchGuard::drop`].
953 ///
954 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
955 /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
956 /// drain-and-release-lock-released discipline (introduced as /qa A1
957 /// fix 2026-05-09 against the prior cross_partition mutex) carries
958 /// over: caller drains under WaveState borrow + state lock, releases
959 /// after both are dropped — `release_handle` may re-enter Core via
960 /// finalizers and re-entry under either guard would deadlock /
961 /// double-borrow.
962 #[must_use]
963 pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
964 if ws.wave_cache_snapshots.is_empty() {
965 return Vec::new();
966 }
967 std::mem::take(&mut ws.wave_cache_snapshots)
968 .into_values()
969 .collect()
970 }
971
972 /// Restore cache slots from `wave_cache_snapshots` and clear the map.
973 /// Called from the wave-abort path in `BatchGuard::drop` (panic).
974 ///
975 /// For each snapshotted node:
976 ///
977 /// 1. Read the current cache (the in-flight new value).
978 /// 2. Set `cache = old_handle` (the snapshot's retained value).
979 /// 3. Release the now-unowned current cache handle.
980 ///
981 /// Returns the list of "current" handles to release outside the lock.
982 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
983 /// to per-thread `WaveState`; signature takes both `s` (for cache
984 /// slots) and `ws` (for the snapshots map).
985 pub(crate) fn restore_wave_cache_snapshots(
986 &self,
987 s: &mut CoreState,
988 ws: &mut WaveState,
989 ) -> Vec<HandleId> {
990 if ws.wave_cache_snapshots.is_empty() {
991 return Vec::new();
992 }
993 let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
994 let mut releases = Vec::with_capacity(snapshots.len());
995 for (node_id, old_handle) in snapshots {
996 let Some(rec) = s.nodes.get_mut(&node_id) else {
997 releases.push(old_handle);
998 continue;
999 };
1000 let current = std::mem::replace(&mut rec.cache, old_handle);
1001 if current != NO_HANDLE {
1002 releases.push(current);
1003 }
1004 }
1005 releases
1006 }
1007
1008 /// D291: success-path drain for `wave_terminal_snapshots` +
1009 /// `wave_dep_terminal_snapshots`. Called from [`BatchGuard::drop`]'s
1010 /// success path (via the same wave-end drain that handles
1011 /// [`Self::drain_wave_cache_snapshots`]). On commit the wave's
1012 /// terminal mutations are kept, so this just clears the snapshot
1013 /// sets without touching `rec.terminal` / dep-record slots — those
1014 /// slots already own their ERROR-handle retains.
1015 pub(crate) fn drain_wave_terminal_snapshots(ws: &mut WaveState) {
1016 // D291 /qa A1 (2026-05-25): `AHashSet::clear()` on an empty set
1017 // is O(1) — the prior `is_empty()` guards were dead pre-checks.
1018 ws.wave_terminal_snapshots.clear();
1019 ws.wave_dep_terminal_snapshots.clear();
1020 }
1021
1022 /// D291: panic-path restore for `wave_terminal_snapshots` +
1023 /// `wave_dep_terminal_snapshots`. Mirrors
1024 /// [`Self::restore_wave_cache_snapshots`]: drains each snapshot,
1025 /// resets the corresponding slot to `None`, and returns the
1026 /// ERROR-tier `HandleId`s the caller must release lock-released.
1027 ///
1028 /// For each snapshotted node / `(node, dep_idx)`:
1029 /// 1. Take the current `Option<TerminalKind>` value (expected
1030 /// `Some(_)` because the snapshot was inserted at the
1031 /// `None → Some(_)` transition).
1032 /// 2. Set the slot back to `None`.
1033 /// 3. If the taken value was `TerminalKind::Error(h)`, push `h`
1034 /// into the releases vec so the caller can drop the slot's
1035 /// retain after the state lock is released.
1036 ///
1037 /// A snapshotted entry whose `rec` no longer exists (orphaned by a
1038 /// torn-down node) is silently skipped — no slot to restore.
1039 pub(crate) fn restore_wave_terminal_snapshots(
1040 &self,
1041 s: &mut CoreState,
1042 ws: &mut WaveState,
1043 ) -> Vec<HandleId> {
1044 let mut releases: Vec<HandleId> = Vec::new();
1045 if !ws.wave_terminal_snapshots.is_empty() {
1046 let snapshots = std::mem::take(&mut ws.wave_terminal_snapshots);
1047 releases.reserve(snapshots.len());
1048 for node_id in snapshots {
1049 let Some(rec) = s.nodes.get_mut(&node_id) else {
1050 // D291 /qa A10 (2026-05-25): fail-loud in debug
1051 // builds — no in-tree code path tears down a node
1052 // mid-wave after its terminal slot was snapshotted,
1053 // so an orphaned snapshot indicates either a future
1054 // in-wave node-drop seam (currently absent) or a
1055 // missed refcount transfer in a future slice.
1056 // Silent skip would leak the ERROR retain held by
1057 // the (now-gone) slot.
1058 debug_assert!(
1059 false,
1060 "D291 invariant: snapshotted node {node_id:?} was torn down \
1061 mid-wave — restore can't release the ERROR retain held by \
1062 the slot. /qa F6."
1063 );
1064 continue;
1065 };
1066 if let Some(TerminalKind::Error(h)) = rec.terminal.take() {
1067 releases.push(h);
1068 }
1069 // R2.6.4 (Lock 6.F) auto-COMPLETE pre-pend semantics —
1070 // see [`Core::teardown_inner`]. `has_received_teardown` is
1071 // NOT snapshotted in this slice (D291 scope is the
1072 // `terminal` slot per Case 5). Lift point: porting-deferred
1073 // §"D291 deferred-scope".
1074 }
1075 }
1076 if !ws.wave_dep_terminal_snapshots.is_empty() {
1077 let snapshots = std::mem::take(&mut ws.wave_dep_terminal_snapshots);
1078 releases.reserve(snapshots.len());
1079 for (child_id, dep_node_id) in snapshots {
1080 // D291 /qa D2 (2026-05-25): re-resolve the dep index by
1081 // dep `NodeId` so a mid-batch `set_deps(child_id, …)`
1082 // that regenerated `dep_records` doesn't zero the wrong
1083 // slot. `dep_index_of` returns `None` if the dep has
1084 // been removed entirely (rewire dropped it before
1085 // rollback) — that's fine, no slot to restore;
1086 // `set_deps`'s own refcount discipline already released
1087 // the slot's retain when the dep was removed.
1088 let Some(rec) = s.nodes.get_mut(&child_id) else {
1089 debug_assert!(
1090 false,
1091 "D291 invariant: snapshotted child {child_id:?} was torn down \
1092 mid-wave (dep {dep_node_id:?}) — restore can't release the \
1093 ERROR retain held by the slot. /qa F6."
1094 );
1095 continue;
1096 };
1097 let Some(idx) = rec.dep_index_of(dep_node_id) else {
1098 continue;
1099 };
1100 if let Some(TerminalKind::Error(h)) = rec.dep_records[idx].terminal.take() {
1101 releases.push(h);
1102 }
1103 }
1104 }
1105 releases
1106 }
1107
1108 /// Drain pending fires until quiescent, then flush wave-end notifications
1109 /// to subscribers. Each fire iteration drops the state lock around the
1110 /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
1111 ///
1112 /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
1113 /// and [`super::node::Core::activate_derived`] (via `run_wave`).
1114 pub(crate) fn drain_and_flush(&self) {
1115 let mut guard = 0u32;
1116 loop {
1117 // A′ (D232-AMEND): apply any producer-sink / timer
1118 // `MailboxOp`s **in-wave** at the top of every drain
1119 // iteration. Each applied op re-enters `Core::{emit,
1120 // complete,error}` with `in_tick = true` (non-owning batch
1121 // guard → it does NOT start its own drain; it queues into
1122 // this wave's `pending_fires`), so a sink that posted during
1123 // the previous `fire_fn` is picked up on the very next
1124 // iteration — immediate, in-wave, cascade-ordering-preserving
1125 // (consistent with the pre-S2b deferred-producer-op
1126 // drained-within-the-wave behaviour). Quiescence requires
1127 // BOTH `pending_fires` AND the mailbox empty.
1128 //
1129 // §7-floor guard (D-reflect, 2026-05-17): gate on the cheap
1130 // `runnable` atomic (one `Acquire` load) BEFORE touching the
1131 // mailbox `parking_lot::Mutex`. A no-producer / no-timer wave
1132 // (the `identity_dedup` ≈508 ns floor path) never posts, so
1133 // `runnable` stays `false` and this costs one relaxed-ish
1134 // atomic load per drain iteration — NOT a mutex lock + deque
1135 // pop. **Sink-side** posts are same-thread, in-wave: the
1136 // `runnable` Release precedes this Acquire on the same
1137 // thread, so no in-wave op is missed. **Task-side** posts
1138 // (timer/temporal `tokio::spawn` tasks) are cross-thread:
1139 // their happens-before is the `ops` `parking_lot::Mutex`
1140 // (acquire/release), NOT this atomic — `runnable` is only an
1141 // advisory fast-path bit there, and a racing task post is
1142 // caught by the embedder pump's *unconditional*
1143 // `drain_mailbox()` (timer tasks drain at the pump, not this
1144 // in-wave gate). Do NOT remove the unconditional pump drain.
1145 // This is exactly the per-group wake bit S4 wires to the
1146 // host executor — doing it here now is coherent, not
1147 // throwaway.
1148 if self.mailbox.is_runnable() || self.deferred.is_runnable() {
1149 self.drain_mailbox();
1150 }
1151
1152 // R1.3.8.c (Slice F, A3): if no fires are pending but there are
1153 // queued pause-overflow ERRORs, synthesize them now. The
1154 // resulting ERROR cascade may add to pending_fires (children
1155 // settling their terminal state), so we loop back to drain.
1156 //
1157 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
1158 // and pending_pause_overflow both live on per-thread
1159 // WaveState. State lock no longer required for either read.
1160 let synth_pending = with_wave_state(|ws| {
1161 if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
1162 std::mem::take(&mut ws.pending_pause_overflow)
1163 } else {
1164 Vec::new()
1165 }
1166 });
1167 for entry in synth_pending {
1168 // Lock-released call to the binding hook. Default impl
1169 // returns None — the binding has opted out of R1.3.8.c
1170 // and we fall back to silent-drop + ResumeReport.dropped.
1171 let handle = self.binding.synthesize_pause_overflow_error(
1172 entry.node_id,
1173 entry.dropped_count,
1174 entry.configured_max,
1175 entry.lock_held_ns / 1_000_000,
1176 );
1177 if let Some(h) = handle {
1178 // Re-enter Core::error to terminate the node and
1179 // cascade. We're inside a wave (`in_tick = true`),
1180 // so error() gets a non-owning batch guard — it
1181 // doesn't try to start its own drain. The cascade
1182 // queues into our outer drain via pending_fires
1183 // and pending_notify.
1184 self.error(entry.node_id, h);
1185 }
1186 }
1187
1188 // Pick next fire under a short lock. Also re-read the configured
1189 // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
1190 // without restarting waves mid-flight.
1191 //
1192 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1193 // on per-thread WaveState; pick_next_fire takes both state and
1194 // WaveState. The pending_size diagnostic and emptiness check
1195 // also read WaveState. Borrow scopes are split: WaveState
1196 // borrow drops before fire_fn runs (which re-borrows WaveState
1197 // via fire_regular / fire_operator).
1198 let (next, cap, pending_size) = {
1199 let s = self.lock_state();
1200 let cap = s.shared.max_batch_drain_iterations;
1201 let (next, pending_size) = with_wave_state(|ws| {
1202 if ws.pending_fires.is_empty() {
1203 return (None, 0);
1204 }
1205 let size = ws.pending_fires.len();
1206 let next = Self::pick_next_fire(&s, ws);
1207 (next, size)
1208 });
1209 (next, cap, pending_size)
1210 };
1211 if pending_size == 0 {
1212 // QA F1 (2026-05-18): quiescence requires BOTH
1213 // `pending_fires` AND the mailbox empty — the asserted
1214 // invariant the old `if pending_size == 0 { break }`
1215 // did NOT enforce. If the mailbox still holds work,
1216 // `continue` so the next iteration's top-of-loop
1217 // `is_runnable()`-gated `drain_mailbox()` applies it
1218 // (an applied op either cascades into `pending_fires`
1219 // or, if terminal/no-op, leaves the mailbox empty so
1220 // the *next* check breaks). §7 floor: no producer/timer
1221 // ⇒ never posted ⇒ `is_runnable()` false ⇒ breaks on the
1222 // first empty `pending_fires` (one atomic load — already
1223 // in the floor budget).
1224 // QA P3 (2026-05-18): the mailbox-continue does NOT
1225 // count against the fire-cascade `cap`. `drain_mailbox`
1226 // already drains the FIFO to quiescence in ONE call
1227 // (re-posts during `apply` are popped by the same
1228 // `drain_into` loop), and the self-reposting-`Defer`
1229 // livelock is bounded INSIDE `drain_into` (its own
1230 // `max_ops`). Reaching here `is_runnable()` again means
1231 // genuinely-new cross-thread (timer) work — bounded by
1232 // external input, not a fire cycle — so counting it
1233 // against the fire `cap` would false-trip a production
1234 // panic on heavy producer/timer graphs.
1235 if self.mailbox.is_runnable() || self.deferred.is_runnable() {
1236 continue;
1237 }
1238 break;
1239 }
1240 guard += 1;
1241 assert!(
1242 guard < cap,
1243 "wave drain exceeded {cap} iterations \
1244 (pending_fires={pending_size}). Most likely cause: a runtime \
1245 cycle introduced by an operator that re-arms its own pending_fires \
1246 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
1247 itself, or a fn that calls Core::emit on a node whose fn fires \
1248 the original node again). Structural cycles via set_deps are \
1249 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
1250 only with concrete evidence the workload needs more iterations."
1251 );
1252 let Some(next) = next else { break };
1253 // fire_fn manages its own locking around invoke_fn.
1254 self.fire_fn(next);
1255 }
1256 // Auto-resolve sweep: nodes registered in pending_auto_resolve
1257 // by the RESOLVED child propagation need a Resolved if they didn't
1258 // fire and settle via their own commit_emission. Check pending_notify
1259 // for each candidate — if it has Dirty but no tier-3+ message, the
1260 // node never settled and needs auto-Resolved. Route through
1261 // queue_notify so paused nodes get the Resolved into their pause
1262 // buffer.
1263 let mut s = self.lock_state();
1264 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
1265 // and pending_notify both live on per-thread WaveState. /qa A5
1266 // fix (2026-05-09): explicit scope for the WaveState borrow so
1267 // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
1268 // re-borrows WaveState for `pending_pause_overflow.push` /
1269 // `pending_notify` writes — re-entrance on RefCell::borrow_mut
1270 // would panic. Explicit scope makes the lifetime load-bearing.
1271 let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
1272 for node_id in candidates {
1273 let needs_resolve = with_wave_state(|ws| {
1274 ws.pending_notify
1275 .get(&node_id)
1276 .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
1277 });
1278 if needs_resolve {
1279 self.queue_notify(&mut s, node_id, Message::Resolved);
1280 }
1281 }
1282 // Final flush phase — populates deferred_flush_jobs
1283 // from pending_notify (already carries per-node sink snapshots).
1284 self.flush_notifications(&mut s);
1285 }
1286
1287 /// Pick the pending node with the lowest topological rank.
1288 ///
1289 /// Nodes with lower `topo_rank` have no transitive upstream in
1290 /// `pending_fires` (by construction — `topo_rank = 1 + max dep rank`).
1291 /// This is O(|pending_fires|) instead of the prior O(N·V) BFS.
1292 /// §10 perf optimization (D047, Slice U).
1293 ///
1294 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` lives on
1295 /// per-thread `WaveState`. Caller passes `&WaveState` alongside
1296 /// `&CoreState` so the borrow scopes stay disjoint and visible.
1297 fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1298 ws.pending_fires
1299 .iter()
1300 .copied()
1301 .min_by_key(|&id| s.nodes.get(&id).map_or(0, |r| r.topo_rank))
1302 }
1303
1304 /// Wave drain entry point. Dispatches via `rec.op` to either the
1305 /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1306 /// dispatch ([`Self::fire_operator`]).
1307 pub(crate) fn fire_fn(&self, node_id: NodeId) {
1308 let op = {
1309 let s = self.lock_state();
1310 s.nodes.get(&node_id).and_then(|r| r.op)
1311 };
1312 match op {
1313 Some(operator_op) => self.fire_operator(node_id, operator_op),
1314 None => {
1315 // State / Derived / Dynamic / Producer all dispatch via fn_id.
1316 self.fire_regular(node_id);
1317 }
1318 }
1319 }
1320
1321 /// Fire a node's fn lock-released around `invoke_fn`.
1322 ///
1323 /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1324 /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1325 /// or stateless.
1326 ///
1327 /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1328 /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1329 /// wave — the in_tick gate composes naturally because nested calls
1330 /// observe `in_tick = true` and skip their own drain.
1331 ///
1332 /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1333 /// decide between Noop+RESOLVED, single Data, or Batch.
1334 ///
1335 /// Phase 4: commit emissions. Single Data goes through
1336 /// `commit_emission` (with equals substitution). Batch emissions are
1337 /// processed in sequence — Data via `commit_emission_verbatim` (no
1338 /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1339 /// terminal cascade.
1340 #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1341 fn fire_regular(&self, node_id: NodeId) {
1342 enum FireAction {
1343 None,
1344 SingleData(HandleId),
1345 Batch(SmallVec<[FnEmission; 2]>),
1346 }
1347
1348 // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1349 // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1350 // (Phase 1.5 below): the cleanup hook only fires when the fn has
1351 // run at least once already in this activation cycle.
1352 let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool, bool)> = {
1353 let s = self.lock_state();
1354 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1355 // on per-thread WaveState. Removed via with_wave_state — no
1356 // re-entry concern because only the immediate remove happens
1357 // under the borrow.
1358 with_wave_state(|ws| {
1359 ws.pending_fires.remove(&node_id);
1360 });
1361 let rec = s.require_node(node_id);
1362 // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1363 // mode opts out of the gate per D011), or stateless.
1364 //
1365 // D263: when `terminal_as_real_input == true`, a terminal dep
1366 // counts as "real input" so the gate opens on COMPLETE-without-
1367 // DATA from any dep (mirrors `fire_operator`'s unconditional
1368 // terminal-aware clause; gated here per-node so the historical
1369 // sentinel-hold behaviour stays the default for `fire_fn`).
1370 let has_real_input = !rec.has_sentinel_deps()
1371 || (rec.terminal_as_real_input
1372 && rec.dep_records.iter().any(|dr| dr.terminal.is_some()));
1373 if rec.terminal.is_some() || (!rec.partial && !has_real_input) {
1374 None
1375 } else {
1376 rec.fn_id.map(|fn_id| {
1377 let use_mask = rec.dep_records.len() <= 64;
1378 let mask = rec.involved_mask;
1379 let dep_batches: Vec<DepBatch> = rec
1380 .dep_records
1381 .iter()
1382 .enumerate()
1383 .map(|(i, dr)| DepBatch {
1384 data: dr.data_batch.clone(),
1385 prev_data: dr.prev_data,
1386 // §10.3 perf (Slice V1): derive from bitmask
1387 // for ≤64 deps; fall back to per-dep field.
1388 involved: if use_mask {
1389 (mask >> i) & 1 != 0
1390 } else {
1391 dr.involved_this_wave
1392 },
1393 })
1394 .collect();
1395 (
1396 fn_id,
1397 dep_batches,
1398 rec.is_dynamic,
1399 rec.has_fired_once,
1400 rec.is_producer(),
1401 )
1402 })
1403 }
1404 };
1405 let Some((fn_id, dep_batches, is_dynamic, has_fired_once, is_producer)) = prep else {
1406 return;
1407 };
1408
1409 // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1410 // the fn has fired at least once in this activation cycle, fire its
1411 // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1412 // local resources. First-fire is intentionally skipped — there is
1413 // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1414 // cleanup re-entrance is not the A6 reentrancy concern (which
1415 // protects against `set_deps(self, ...)` from inside the in-flight
1416 // invoke_fn). Operator nodes never reach this path (`fire_regular`
1417 // is the fn-id branch of `fire_fn`; operators dispatch via
1418 // `fire_operator`), so cleanup hooks correctly only fire for fn-
1419 // shaped nodes (state / derived / dynamic / producer).
1420 if has_fired_once {
1421 self.binding
1422 .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1423 }
1424
1425 // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1426 // the FFI call only — Phase 3's lock-held state mutation is not part
1427 // of "currently firing" because set_deps would already block on the
1428 // state lock by then. Drop on the guard pops the stack even if
1429 // invoke_fn panics, keeping `currently_firing` balanced.
1430 //
1431 // D246 rule 5 / D245 (QA D1) — the owner-side full-`Core` facade
1432 // hand-off is needed ONLY by producer-building bindings (to
1433 // construct `ProducerCtx` from a real Core surface here without a
1434 // thread-local / `Core` clone / stored back-ref — all β-invalid
1435 // under the actor model). Branch on node kind so the hot
1436 // derived/dynamic/state path keeps the single parameterless
1437 // `invoke_fn` virtual call it always had (no `&dyn CoreFull`
1438 // fat-pointer coercion, no default-body re-dispatch) — byte-
1439 // identical to pre-D246, zero §7-floor regression. Only the rare
1440 // producer-build fire pays the facade hand-off. `self: &Core`
1441 // unsized-coerces to `&dyn CoreFull` (`Core: CoreFull`).
1442 let result = {
1443 let _firing = FiringGuard::new(self, node_id);
1444 if is_producer {
1445 self.binding
1446 .invoke_fn_with_core(node_id, fn_id, &dep_batches, self)
1447 } else {
1448 self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1449 }
1450 };
1451
1452 // Phase 3: apply result under the lock — defensive terminal check
1453 // (a sibling cascade may have terminated this node during phase 2).
1454 let action: FireAction = {
1455 let mut s = self.lock_state();
1456 // Defensive: node may have terminated mid-phase-2 via a sibling
1457 // cascade (a fn that re-entered `Core::error` on a path that
1458 // cascaded here). If so, release any payload handles and no-op.
1459 if s.require_node(node_id).terminal.is_some() {
1460 match &result {
1461 FnResult::Data { handle, .. } => {
1462 self.binding.release_handle(*handle);
1463 }
1464 FnResult::Batch { emissions, .. } => {
1465 for em in emissions {
1466 match em {
1467 FnEmission::Data(h) | FnEmission::Error(h) => {
1468 self.binding.release_handle(*h);
1469 }
1470 FnEmission::Complete => {}
1471 }
1472 }
1473 }
1474 FnResult::Noop { .. } => {}
1475 }
1476 return;
1477 }
1478 let rec = s.require_node_mut(node_id);
1479 rec.has_fired_once = true;
1480 if is_dynamic {
1481 let tracked = match &result {
1482 FnResult::Data { tracked, .. }
1483 | FnResult::Noop { tracked }
1484 | FnResult::Batch { tracked, .. } => tracked.clone(),
1485 };
1486 if let Some(t) = tracked {
1487 rec.tracked = t.into_iter().collect();
1488 }
1489 }
1490 match result {
1491 FnResult::Noop { .. } => {
1492 // Slice G: skip Resolved if a prior emission in the same
1493 // wave already queued tier-3 (would violate R1.3.3.a).
1494 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1495 // lives on per-thread WaveState. Borrow scoped to the
1496 // tier3 read so queue_notify (which re-borrows
1497 // WaveState) doesn't double-borrow.
1498 let already_dirty = s.require_node(node_id).dirty;
1499 let already_tier3 = with_wave_state(|ws| {
1500 ws.pending_notify
1501 .get(&node_id)
1502 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1503 });
1504 if already_dirty && !already_tier3 {
1505 self.queue_notify(&mut s, node_id, Message::Resolved);
1506 }
1507 FireAction::None
1508 }
1509 FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1510 FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1511 // Empty Batch is equivalent to Noop — settle with
1512 // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1513 // skip if a prior emission already queued tier-3.
1514 // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1515 // arm above for the WaveState borrow scope rationale.
1516 let already_dirty = s.require_node(node_id).dirty;
1517 let already_tier3 = with_wave_state(|ws| {
1518 ws.pending_notify
1519 .get(&node_id)
1520 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1521 });
1522 if already_dirty && !already_tier3 {
1523 self.queue_notify(&mut s, node_id, Message::Resolved);
1524 }
1525 FireAction::None
1526 }
1527 FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1528 }
1529 };
1530
1531 // Phase 4: commit emissions.
1532 match action {
1533 FireAction::None => {}
1534 // Single Data — equals substitution applies (R1.3.2).
1535 FireAction::SingleData(handle) => {
1536 self.commit_emission(node_id, handle);
1537 }
1538 // Batch — process in sequence. No equals substitution
1539 // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1540 FireAction::Batch(emissions) => {
1541 self.commit_batch(node_id, emissions);
1542 }
1543 }
1544 }
1545
1546 /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1547 /// through `commit_emission_verbatim` (no equals substitution per
1548 /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1549 /// cascade per R1.3.4; processing stops at the first terminal and
1550 /// remaining handles are released (R1.3.4.a: no further messages
1551 /// after terminal).
1552 fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1553 let mut iter = emissions.into_iter();
1554 for em in iter.by_ref() {
1555 match em {
1556 FnEmission::Data(handle) => {
1557 self.commit_emission_verbatim(node_id, handle);
1558 }
1559 FnEmission::Complete => {
1560 self.complete(node_id);
1561 break;
1562 }
1563 FnEmission::Error(handle) => {
1564 self.error(node_id, handle);
1565 break;
1566 }
1567 }
1568 }
1569 // Release handles from any emissions after the terminal break.
1570 for em in iter {
1571 match em {
1572 FnEmission::Data(h) | FnEmission::Error(h) => {
1573 self.binding.release_handle(h);
1574 }
1575 FnEmission::Complete => {}
1576 }
1577 }
1578 }
1579
1580 // -------------------------------------------------------------------
1581 // Emission commit — equals-substitution lives here
1582 // -------------------------------------------------------------------
1583
1584 /// Apply a node's emission. `&self`-only; brackets the equals check
1585 /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1586 /// Core safely.
1587 ///
1588 /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1589 /// equals_mode + old cache handle.
1590 ///
1591 /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1592 /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1593 /// crosses to the binding's `custom_equals` oracle, which may re-enter
1594 /// Core.
1595 ///
1596 /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1597 /// pending_notify (which snapshots subscribers on first touch),
1598 /// propagate to children.
1599 // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1600 // function is already a multi-phase wave-engine routine and breaking
1601 // out the four phases would obscure the lock-discipline.
1602 #[allow(clippy::too_many_lines)]
1603 pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1604 assert!(
1605 new_handle != NO_HANDLE,
1606 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1607 );
1608
1609 // Phase 1: terminal short-circuit + snapshot equals/cache.
1610 let snapshot = {
1611 let s = self.lock_state();
1612 let rec = s.require_node(node_id);
1613 // (§7-D: the throwaway `bench_state_collapse` relocated
1614 // is_state/producer assert was removed — the normal-path
1615 // validation in `Core::emit` is retained; the
1616 // commit_emission single-pass collapse is deferred, §7-A.)
1617 if rec.terminal.is_some() {
1618 drop(s);
1619 self.binding.release_handle(new_handle);
1620 return;
1621 }
1622 (rec.cache, rec.equals)
1623 };
1624 let (old_handle, equals_mode) = snapshot;
1625
1626 // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1627 // fires for SINGLE-DATA waves at one node. Detect "this is a
1628 // subsequent emit in the same wave at this node" via the
1629 // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1630 // (D1 patch, 2026-05-09 — moved off per-partition state to be
1631 // robust against mid-wave cross-thread `set_deps` partition
1632 // splits). If set → multi-emit wave: skip equals, queue Data
1633 // verbatim, retroactively rewrite any prior Resolved (queued by
1634 // an earlier same-value emit's equals match) to Data using the
1635 // wave-start cache snapshot. Outside batch / first emit:
1636 // standard per-emit equals path. Thread-local lookup is
1637 // ~5ns and lock-free.
1638 let is_subsequent_emit_in_wave = tier3_check(node_id);
1639
1640 if is_subsequent_emit_in_wave {
1641 // Multi-emit wave detected. Skip equals, queue Data verbatim.
1642 // Also rewrite any prior Resolved entries to Data using the
1643 // wave-start cache snapshot.
1644 self.rewrite_prior_resolved_to_data(node_id);
1645 self.commit_emission_verbatim(node_id, new_handle);
1646 return;
1647 }
1648
1649 // Phase 2: equals check (lock-released for Custom).
1650 let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1651
1652 // Phase 3: apply emission under the lock. Defensive terminal
1653 // re-check — a concurrent cascade between phase 2 and phase 3
1654 // could have terminated the node.
1655 let mut s = self.lock_state();
1656 if s.require_node(node_id).terminal.is_some() {
1657 drop(s);
1658 self.binding.release_handle(new_handle);
1659 return;
1660 }
1661
1662 // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1663 // dirty from an earlier emission in the same wave.
1664 let already_dirty = s.require_node(node_id).dirty;
1665 s.require_node_mut(node_id).dirty = true;
1666 if !already_dirty {
1667 self.queue_notify(&mut s, node_id, Message::Dirty);
1668 }
1669
1670 if is_data {
1671 // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1672 // re-entry from a `custom_equals` oracle that called back into
1673 // `Core::emit` on this same node during phase 2's lock-released
1674 // equals check could have advanced the cache between phase 1's
1675 // snapshot (`old_handle`) and this point.
1676 let current_cache = s.require_node(node_id).cache;
1677 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1678 // lives on per-thread WaveState. `in_tick` is the one-Core-
1679 // per-OS-thread [`IN_TICK_OWNED`] slot (D252); this read is on
1680 // the wave-owner thread, so it observes this thread's own
1681 // ownership.
1682 let in_tick = self.in_tick();
1683 let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1684 use std::collections::hash_map::Entry;
1685 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1686 Entry::Vacant(slot) => {
1687 slot.insert(current_cache);
1688 true
1689 }
1690 Entry::Occupied(_) => false,
1691 })
1692 } else {
1693 false
1694 };
1695 s.require_node_mut(node_id).cache = new_handle;
1696 if current_cache != NO_HANDLE && !snapshot_taken {
1697 self.binding.release_handle(current_cache);
1698 }
1699 // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1700 // buffer if the node opted in. RESOLVED entries are NOT
1701 // buffered (canonical "DATA only").
1702 self.push_replay_buffer(&mut s, node_id, new_handle);
1703 // Slice G (D1 patch, 2026-05-09): mark this node as having
1704 // emitted tier-3 in this wave on the per-thread tracker.
1705 tier3_mark(node_id);
1706 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1707 // Propagate to children
1708 let child_ids: Vec<NodeId> = s
1709 .children
1710 .get(&node_id)
1711 .map(|c| c.iter().copied().collect())
1712 .unwrap_or_default();
1713 for child_id in child_ids {
1714 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1715 if let Some(idx) = dep_idx {
1716 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1717 }
1718 }
1719 } else {
1720 // RESOLVED: handle unchanged. Don't release; old still in use.
1721 // Slice G: snapshot cache so a subsequent same-wave emit can
1722 // rewrite this Resolved to Data using the snapshot.
1723 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1724 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10);
1725 // D252 (S5) collapsed to one-Core-per-OS-thread `Cell<u64>` —
1726 // `in_tick` is read on the wave-owner
1727 // thread (observes this thread's own ownership).
1728 let current_cache = s.require_node(node_id).cache;
1729 if self.in_tick() && current_cache != NO_HANDLE {
1730 use std::collections::hash_map::Entry;
1731 with_wave_state(|ws| {
1732 if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1733 self.binding.retain_handle(current_cache);
1734 slot.insert(current_cache);
1735 }
1736 });
1737 }
1738 // Slice G (D1 patch, 2026-05-09): mark this node as having
1739 // emitted tier-3 in this wave on the per-thread tracker.
1740 tier3_mark(node_id);
1741 self.queue_notify(&mut s, node_id, Message::Resolved);
1742 let child_ids: Vec<NodeId> = s
1743 .children
1744 .get(&node_id)
1745 .map(|c| c.iter().copied().collect())
1746 .unwrap_or_default();
1747 // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1748 // during the loop and bulk-insert into pending_auto_resolve
1749 // under a SINGLE cross_partition acquire after the loop.
1750 // Pre-fix the loop acquired `cross_partition` once per
1751 // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1752 // which is N mutex hops for an N-child cascade. Cannot
1753 // hoist to acquire-cps-before-loop because `queue_notify`
1754 // (called inside the loop) also acquires cross_partition
1755 // for `pending_pause_overflow.push` in the rare overflow
1756 // case — re-entrance on the non-reentrant Mutex would
1757 // self-deadlock.
1758 let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1759 for child_id in child_ids {
1760 let already_involved = s.require_node(child_id).involved_this_wave;
1761 if !already_involved {
1762 {
1763 let child = s.require_node_mut(child_id);
1764 child.involved_this_wave = true;
1765 child.dirty = true;
1766 }
1767 self.queue_notify(&mut s, child_id, Message::Dirty);
1768 // Q2 (2026-05-09): pending_auto_resolve lives on
1769 // CrossPartitionState. Deferred to after-loop
1770 // bulk insert per the /qa A7 fix above.
1771 auto_resolve_inserts.push(child_id);
1772 }
1773 }
1774 // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1775 // single WaveState borrow for the bulk-insert. queue_notify
1776 // above no longer holds the WaveState borrow by the time we
1777 // reach here, so this borrow is uncontested.
1778 if !auto_resolve_inserts.is_empty() {
1779 with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1780 }
1781 }
1782 }
1783
1784 /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1785 /// emit arrives while a prior tier-3 message is still pending), rewrite
1786 /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1787 /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1788 /// Touches both `pending_notify` (immediate-flush path) and the per-node
1789 /// pause buffer (paused path).
1790 fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1791 let mut s = self.lock_state();
1792 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1793 // and pending_notify both live on per-thread WaveState. Single
1794 // WaveState borrow handles both the snapshot lookup and the
1795 // pending_notify rewrite; the pause-buffer path uses the state
1796 // lock and is independent of WaveState.
1797 let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1798 Some(h) if h != NO_HANDLE => h,
1799 // No snapshot available — the prior Resolved was queued without
1800 // a cache (sentinel pre-emit). Nothing to rewrite to; the
1801 // multi-emit case from sentinel is fine (verbatim Data path).
1802 _ => return,
1803 };
1804 let mut retains_needed = 0u32;
1805 // Pending_notify path. Walk all batches' messages — Slice-G
1806 // coalescing reasons about wave-content per node, not per-batch.
1807 with_wave_state(|ws| {
1808 if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1809 for msg in entry.iter_messages_mut() {
1810 if matches!(msg, Message::Resolved) {
1811 *msg = Message::Data(snapshot);
1812 retains_needed += 1;
1813 }
1814 }
1815 }
1816 });
1817 // Pause-buffer path.
1818 if let Some(rec) = s.nodes.get_mut(&node_id) {
1819 if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1820 for msg in &mut *buffer {
1821 if matches!(msg, Message::Resolved) {
1822 *msg = Message::Data(snapshot);
1823 retains_needed += 1;
1824 }
1825 }
1826 }
1827 }
1828 drop(s);
1829 // Each rewritten Resolved → Data adds a payload retain that
1830 // queue_notify would otherwise have taken at emit time. The
1831 // snapshot already owns one retain (taken when cache was
1832 // snapshotted); we need one fresh retain per rewrite.
1833 for _ in 0..retains_needed {
1834 self.binding.retain_handle(snapshot);
1835 }
1836 }
1837
1838 /// Equals check that crosses the binding boundary lock-released for
1839 /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1840 fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1841 if a == b {
1842 return true; // identity-on-handles always sufficient
1843 }
1844 if a == NO_HANDLE || b == NO_HANDLE {
1845 return false;
1846 }
1847 match mode {
1848 EqualsMode::Identity => false,
1849 EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1850 }
1851 }
1852
1853 /// Commit a DATA emission **without** equals substitution — used by
1854 /// `FnResult::Batch` processing where multi-message waves pass through
1855 /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1856 /// R1.3.1.a condition (b): only queued if node not already dirty.
1857 ///
1858 /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1859 /// but skips the Phase 2 equals check entirely.
1860 fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1861 assert!(
1862 new_handle != NO_HANDLE,
1863 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1864 );
1865
1866 let mut s = self.lock_state();
1867 let rec = s.require_node(node_id);
1868 if rec.terminal.is_some() {
1869 drop(s);
1870 self.binding.release_handle(new_handle);
1871 return;
1872 }
1873
1874 // R1.3.1.a condition (b): DIRTY only if not already dirty.
1875 let already_dirty = s.require_node(node_id).dirty;
1876 s.require_node_mut(node_id).dirty = true;
1877 if !already_dirty {
1878 self.queue_notify(&mut s, node_id, Message::Dirty);
1879 }
1880
1881 // Always DATA — no equals substitution for Batch emissions.
1882 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1883 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10);
1884 // D252 (S5) collapsed to one-Core-per-OS-thread `Cell<u64>` —
1885 // `in_tick` is read on the wave-owner thread.
1886 let current_cache = s.require_node(node_id).cache;
1887 let snapshot_taken = if self.in_tick() && current_cache != NO_HANDLE {
1888 use std::collections::hash_map::Entry;
1889 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1890 Entry::Vacant(slot) => {
1891 slot.insert(current_cache);
1892 true
1893 }
1894 Entry::Occupied(_) => false,
1895 })
1896 } else {
1897 false
1898 };
1899 s.require_node_mut(node_id).cache = new_handle;
1900 if current_cache != NO_HANDLE && !snapshot_taken {
1901 self.binding.release_handle(current_cache);
1902 }
1903 // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1904 self.push_replay_buffer(&mut s, node_id, new_handle);
1905 // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1906 // tier3_emitted_this_wave on the per-thread tracker even on the
1907 // verbatim path. A subsequent commit_emission at the same node
1908 // in the same wave needs this flag to detect multi-emit and
1909 // skip equals substitution; without it, a Batch-then-standard
1910 // sequence would queue Resolved into a wave that already has
1911 // Data — violating R1.3.3.a. The Batch path itself still
1912 // passes verbatim per R1.3.3.c (we don't re-run equals here);
1913 // we just record that "this node has emitted tier-3 in this
1914 // wave."
1915 tier3_mark(node_id);
1916 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1917 // Propagate to children
1918 let child_ids: Vec<NodeId> = s
1919 .children
1920 .get(&node_id)
1921 .map(|c| c.iter().copied().collect())
1922 .unwrap_or_default();
1923 for child_id in child_ids {
1924 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1925 if let Some(idx) = dep_idx {
1926 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1927 }
1928 }
1929 }
1930
1931 /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1932 /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1933 /// fresh retain on push. RESOLVED is NOT buffered per canonical
1934 /// "DATA only" — call sites only invoke this for Data emissions.
1935 ///
1936 /// Evicted handle is queued into `cps.deferred_handle_releases`
1937 /// (released lock-released at flush time) per the binding-boundary
1938 /// lock-release discipline — `release_handle` may re-enter Core via
1939 /// finalizers and must not run while the state lock is held
1940 /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1941 /// CrossPartitionState; this fn acquires `cross_partition` only
1942 /// when an eviction actually happens (the common case is no
1943 /// eviction → no second-mutex acquire).
1944 fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1945 let rec = s.require_node_mut(node_id);
1946 let cap = match rec.replay_buffer_cap {
1947 Some(c) if c > 0 => c,
1948 _ => return,
1949 };
1950 self.binding.retain_handle(new_handle);
1951 rec.replay_buffer.push_back(new_handle);
1952 let evicted = if rec.replay_buffer.len() > cap {
1953 rec.replay_buffer.pop_front()
1954 } else {
1955 None
1956 };
1957 if let Some(h) = evicted {
1958 with_wave_state(|ws| ws.deferred_handle_releases.push(h));
1959 }
1960 }
1961
1962 // ===================================================================
1963 // Operator dispatch (Slice C-1, D009).
1964 //
1965 // `fire_operator` is the entry point for nodes whose `kind` is
1966 // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1967 // to per-operator helpers that snapshot inputs under the lock, drop the
1968 // lock to call the binding's bulk projection FFI, and reacquire to
1969 // apply emissions via `commit_emission_verbatim` (no per-item equals
1970 // dedup at the wire — operator output passes verbatim per the same
1971 // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1972 //
1973 // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1974 // share retains owned by the wave's data-batch slot (released at
1975 // wave-end rotation in `clear_wave_state`). Operators that emit those
1976 // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1977 // `prev` carry-over) take an additional retain via `retain_handle`
1978 // before passing to `commit_emission_verbatim` — the cache slot owns
1979 // its own share, independent of the data-batch slot's. Operators that
1980 // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1981 // packed tuples) receive retains pre-bumped by the binding's bulk-
1982 // projection method.
1983 // ===================================================================
1984
1985 /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1986 /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1987 /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1988 fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1989 // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1990 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1991 // per-thread WaveState; state lock + WaveState borrow are
1992 // independent.
1993 let proceed = {
1994 let s = self.lock_state();
1995 with_wave_state(|ws| {
1996 ws.pending_fires.remove(&node_id);
1997 });
1998 let rec = s.require_node(node_id);
1999 if rec.terminal.is_some() {
2000 false
2001 } else {
2002 // First-run gate (R2.5.3 / R5.4). Partial-mode operators
2003 // (D011) opt out of the gate; otherwise we wait for every
2004 // dep to have delivered at least one real handle. Terminal-
2005 // aware operators (currently `Reduce`) additionally count a
2006 // dep terminal as "real input" so they can fire on
2007 // upstream COMPLETE-without-DATA and emit the seed.
2008 let has_real_input = !rec.has_sentinel_deps()
2009 || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
2010 rec.partial || has_real_input
2011 }
2012 };
2013 if !proceed {
2014 return;
2015 }
2016
2017 // A6 (Slice F, 2026-05-07): track operator fire on the
2018 // `currently_firing` stack so a binding-side project/predicate/fold
2019 // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
2020 // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
2021 // the stack on panic too.
2022 let _firing = FiringGuard::new(self, node_id);
2023
2024 match op {
2025 OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
2026 OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
2027 OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
2028 OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
2029 OperatorOp::DistinctUntilChanged { equals_fn_id } => {
2030 self.fire_op_distinct(node_id, equals_fn_id);
2031 }
2032 OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
2033 OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
2034 OperatorOp::WithLatestFrom { pack_fn } => {
2035 self.fire_op_with_latest_from(node_id, pack_fn);
2036 }
2037 OperatorOp::Merge => self.fire_op_merge(node_id),
2038 OperatorOp::Take { count } => self.fire_op_take(node_id, count),
2039 OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
2040 OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
2041 // The variant carries `default` for `register_operator`'s
2042 // `make_op_scratch` path; once registered, the live default
2043 // is read from `LastState::default` inside `fire_op_last`.
2044 OperatorOp::Last { .. } => self.fire_op_last(node_id),
2045 OperatorOp::Tap { fn_id } => self.fire_op_tap(node_id, fn_id),
2046 OperatorOp::TapFirst { fn_id } => self.fire_op_tap_first(node_id, fn_id),
2047 OperatorOp::Valve => self.fire_op_valve(node_id),
2048 OperatorOp::Settle {
2049 quiet_waves,
2050 max_waves,
2051 } => self.fire_op_settle(node_id, quiet_waves, max_waves),
2052 }
2053 }
2054
2055 /// Snapshot the operator's single dep batch (transform constraint —
2056 /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
2057 /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
2058 /// `dep_records[0].terminal` at snapshot time.
2059 fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
2060 let s = self.lock_state();
2061 let rec = s.require_node(node_id);
2062 debug_assert!(
2063 !rec.dep_records.is_empty(),
2064 "transform operator must have ≥1 dep"
2065 );
2066 let dr = &rec.dep_records[0];
2067 (dr.data_batch.iter().copied().collect(), dr.terminal)
2068 }
2069
2070 /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
2071 /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
2072 /// when a wave's inputs all suppress and the operator needs to settle
2073 /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
2074 /// on full-reject).
2075 fn settle_dirty_resolved(&self, node_id: NodeId) {
2076 let mut s = self.lock_state();
2077 if s.require_node(node_id).terminal.is_some() {
2078 return;
2079 }
2080 let already_dirty = s.require_node(node_id).dirty;
2081 s.require_node_mut(node_id).dirty = true;
2082 if !already_dirty {
2083 self.queue_notify(&mut s, node_id, Message::Dirty);
2084 }
2085 // Slice G: skip Resolved if pending_notify already has a tier-3
2086 // message — adding Resolved would violate R1.3.3.a.
2087 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2088 // on per-thread WaveState; borrow scoped so queue_notify can
2089 // re-borrow.
2090 let already_tier3 = with_wave_state(|ws| {
2091 ws.pending_notify
2092 .get(&node_id)
2093 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
2094 });
2095 if !already_tier3 {
2096 self.queue_notify(&mut s, node_id, Message::Resolved);
2097 }
2098 }
2099
2100 /// `OperatorOp::Map` dispatch.
2101 fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2102 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2103 // Mark fired regardless of input count (activation gate already
2104 // satisfied or partial-mode).
2105 {
2106 let mut s = self.lock_state();
2107 s.require_node_mut(node_id).has_fired_once = true;
2108 }
2109 if inputs.is_empty() {
2110 return;
2111 }
2112 // Phase 2 (lock-released): bulk project. Binding returns one
2113 // handle per input, each with a retain share already taken.
2114 let outputs = self.binding.project_each(fn_id, &inputs);
2115 // Phase 3: emit each output. `commit_emission_verbatim` consumes
2116 // the retain into the cache slot (and releases the prior cache
2117 // handle internally).
2118 for h in outputs {
2119 self.commit_emission_verbatim(node_id, h);
2120 }
2121 }
2122
2123 /// `OperatorOp::Filter` dispatch (D012/D018).
2124 fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2125 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2126 {
2127 let mut s = self.lock_state();
2128 s.require_node_mut(node_id).has_fired_once = true;
2129 }
2130 if inputs.is_empty() {
2131 return;
2132 }
2133 // Phase 2: predicate per input.
2134 let pass = self.binding.predicate_each(fn_id, &inputs);
2135 // Slice V2: promoted from debug_assert! — binding contract violation
2136 // should fail loud in release builds too.
2137 assert!(
2138 pass.len() == inputs.len(),
2139 "predicate_each returned {} bools for {} inputs",
2140 pass.len(),
2141 inputs.len()
2142 );
2143 // Phase 3: emit passing items verbatim. Take a fresh retain for
2144 // each — the data_batch slot still owns its retain (released at
2145 // wave-end rotation), and the cache slot needs its own.
2146 let mut emitted = 0usize;
2147 for (i, &h) in inputs.iter().enumerate() {
2148 if pass.get(i).copied().unwrap_or(false) {
2149 self.binding.retain_handle(h);
2150 self.commit_emission_verbatim(node_id, h);
2151 emitted += 1;
2152 }
2153 }
2154 // D018: full-reject settles with DIRTY+RESOLVED.
2155 if emitted == 0 {
2156 self.settle_dirty_resolved(node_id);
2157 }
2158 }
2159
2160 /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
2161 fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2162 use crate::op_state::ScanState;
2163 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2164 let acc = {
2165 let s = self.lock_state();
2166 scratch_ref::<ScanState>(&s, node_id).acc
2167 };
2168 {
2169 let mut s = self.lock_state();
2170 s.require_node_mut(node_id).has_fired_once = true;
2171 }
2172 if inputs.is_empty() {
2173 return;
2174 }
2175 // Phase 2: fold each input through. Returns N new handles, each
2176 // with a fresh retain.
2177 let new_states = self.binding.fold_each(fn_id, acc, &inputs);
2178 // Slice V2: promoted from debug_assert! — binding contract violation.
2179 assert!(
2180 new_states.len() == inputs.len(),
2181 "fold_each returned {} accs for {} inputs",
2182 new_states.len(),
2183 inputs.len()
2184 );
2185 // Phase 3a: update ScanState.acc to the LAST new acc. Take an
2186 // extra retain for the slot; release the prior acc's slot retain.
2187 let last_acc = new_states.last().copied();
2188 if let Some(last) = last_acc {
2189 let prev_acc = {
2190 let mut s = self.lock_state();
2191 let scratch = scratch_mut::<ScanState>(&mut s, node_id);
2192 let prev = scratch.acc;
2193 scratch.acc = last;
2194 prev
2195 };
2196 // Take the slot's retain on the new acc.
2197 self.binding.retain_handle(last);
2198 // Release the prior slot's retain (post-lock to keep binding
2199 // free to re-enter Core safely).
2200 if prev_acc != crate::handle::NO_HANDLE {
2201 self.binding.release_handle(prev_acc);
2202 }
2203 }
2204 // Phase 3b: emit each intermediate acc verbatim. `new_states`
2205 // entries each carry one retain from `fold_each`; that retain is
2206 // consumed by `commit_emission_verbatim` into the cache slot.
2207 for h in new_states {
2208 self.commit_emission_verbatim(node_id, h);
2209 }
2210 }
2211
2212 /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
2213 /// upstream COMPLETE (cascades ERROR verbatim).
2214 fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2215 use crate::op_state::ReduceState;
2216 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2217 let acc = {
2218 let s = self.lock_state();
2219 scratch_ref::<ReduceState>(&s, node_id).acc
2220 };
2221 {
2222 let mut s = self.lock_state();
2223 s.require_node_mut(node_id).has_fired_once = true;
2224 }
2225 // Phase 2: accumulate (silent — no per-input emit).
2226 let new_states = if inputs.is_empty() {
2227 SmallVec::<[HandleId; 1]>::new()
2228 } else {
2229 self.binding.fold_each(fn_id, acc, &inputs)
2230 };
2231 // Slice V2: promoted from debug_assert! — binding contract violation.
2232 assert!(
2233 new_states.len() == inputs.len(),
2234 "fold_each returned {} accs for {} inputs",
2235 new_states.len(),
2236 inputs.len()
2237 );
2238 // Update ReduceState.acc to last new acc; release intermediate
2239 // states (we don't emit them) and the prior acc's slot retain.
2240 let last_acc = new_states.last().copied();
2241 let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
2242 new_states[..new_states.len() - 1].to_vec()
2243 } else {
2244 Vec::new()
2245 };
2246 let prev_acc_to_release = if let Some(last) = last_acc {
2247 let prev_acc = {
2248 let mut s = self.lock_state();
2249 let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
2250 let prev = scratch.acc;
2251 scratch.acc = last;
2252 prev
2253 };
2254 self.binding.retain_handle(last);
2255 if prev_acc == crate::handle::NO_HANDLE {
2256 None
2257 } else {
2258 Some(prev_acc)
2259 }
2260 } else {
2261 None
2262 };
2263 // Release intermediate fold results (Reduce only emits the LAST,
2264 // but only on terminal). Each was retained by `fold_each`.
2265 for h in intermediates_to_release {
2266 self.binding.release_handle(h);
2267 }
2268 if let Some(h) = prev_acc_to_release {
2269 self.binding.release_handle(h);
2270 }
2271
2272 // Phase 3: emit on terminal.
2273 match terminal {
2274 None => {
2275 // Still accumulating; no emit. Subscribers see no message
2276 // for this wave (silent accumulation). The first wave that
2277 // pushes Reduce to fire produces a Dirty entry on the
2278 // upstream's commit, but Reduce itself doesn't queue any
2279 // tier-3 since R5 silently absorbs. v1: leave the
2280 // post-drain auto-resolve sweep to settle nothing —
2281 // pending_notify has no entry for Reduce so the sweep is
2282 // a no-op.
2283 }
2284 Some(TerminalKind::Complete) => {
2285 // Read the live acc (may be the seed if no DATA arrived)
2286 // and emit Data(acc) + Complete.
2287 let final_acc = {
2288 let s = self.lock_state();
2289 scratch_ref::<ReduceState>(&s, node_id).acc
2290 };
2291 if final_acc != crate::handle::NO_HANDLE {
2292 // Emission needs its own retain (slot's retain is
2293 // owned by ReduceState.acc until reset/Drop).
2294 self.binding.retain_handle(final_acc);
2295 self.commit_emission_verbatim(node_id, final_acc);
2296 }
2297 self.complete(node_id);
2298 }
2299 Some(TerminalKind::Error(h)) => {
2300 // Core::error transfers the caller's share into the
2301 // cascade (node.terminal + per-child dep_terminal slots);
2302 // no release at the error() boundary. Take a fresh share
2303 // here so the cascade owns it independently of the
2304 // dep_records[0].terminal slot's share.
2305 self.binding.retain_handle(h);
2306 self.error(node_id, h);
2307 }
2308 }
2309 }
2310
2311 /// `OperatorOp::DistinctUntilChanged` dispatch.
2312 fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
2313 use crate::op_state::DistinctState;
2314 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2315 let mut prev = {
2316 let s = self.lock_state();
2317 scratch_ref::<DistinctState>(&s, node_id).prev
2318 };
2319 {
2320 let mut s = self.lock_state();
2321 s.require_node_mut(node_id).has_fired_once = true;
2322 }
2323 if inputs.is_empty() {
2324 return;
2325 }
2326 // Take a working-copy retain on the initial prev so both the loop
2327 // (which releases old_prev on each non-equal item) and phase 3
2328 // (which releases the slot's original handle) each have their own
2329 // share. Without this, the loop's release of old_prev (== original
2330 // DistinctState.prev) double-releases against phase 3's stale_slot
2331 // release.
2332 if prev != crate::handle::NO_HANDLE {
2333 self.binding.retain_handle(prev);
2334 }
2335 // Phase 2: per-input equals(prev, current). Each non-equal input
2336 // is emitted and becomes the new prev. Equals fn_id reuses
2337 // `BindingBoundary::custom_equals`.
2338 let mut emitted = 0usize;
2339 for &h in &inputs {
2340 let equal = if prev == crate::handle::NO_HANDLE {
2341 false
2342 } else if prev == h {
2343 true
2344 } else {
2345 self.binding.custom_equals(equals_fn_id, prev, h)
2346 };
2347 if !equal {
2348 // Emit this input verbatim.
2349 self.binding.retain_handle(h);
2350 self.commit_emission_verbatim(node_id, h);
2351 // Update prev: take retain on new prev, release old
2352 // (working-copy retain from above or from prior iteration).
2353 self.binding.retain_handle(h);
2354 let old_prev = prev;
2355 prev = h;
2356 if old_prev != crate::handle::NO_HANDLE {
2357 self.binding.release_handle(old_prev);
2358 }
2359 emitted += 1;
2360 }
2361 }
2362 // Phase 3: persist prev into DistinctState.prev slot. Release the
2363 // slot's original retain (stale_slot) — this is the slot-owned
2364 // share, independent of the working-copy share released in the
2365 // loop above.
2366 {
2367 let mut s = self.lock_state();
2368 let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2369 let stale_slot = scratch.prev;
2370 scratch.prev = prev;
2371 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2372 drop(s);
2373 self.binding.release_handle(stale_slot);
2374 }
2375 }
2376 // Release the working-copy retain on the final prev if it was
2377 // never released in the loop (i.e. no non-equal items passed,
2378 // prev == original). In that case stale_slot == prev, so phase 3
2379 // didn't release it either — but the working-copy retain is still
2380 // outstanding. Release it now.
2381 if emitted == 0 && prev != crate::handle::NO_HANDLE {
2382 self.binding.release_handle(prev);
2383 }
2384 if emitted == 0 {
2385 self.settle_dirty_resolved(node_id);
2386 }
2387 }
2388
2389 /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2390 /// starting after the second value (first input swallowed, sets `prev`).
2391 fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2392 use crate::op_state::PairwiseState;
2393 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2394 let mut prev = {
2395 let s = self.lock_state();
2396 scratch_ref::<PairwiseState>(&s, node_id).prev
2397 };
2398 {
2399 let mut s = self.lock_state();
2400 s.require_node_mut(node_id).has_fired_once = true;
2401 }
2402 if inputs.is_empty() {
2403 return;
2404 }
2405 let mut emitted = 0usize;
2406 for &h in &inputs {
2407 if prev == crate::handle::NO_HANDLE {
2408 // First-ever value — swallow, set prev. Retain for the
2409 // PairwiseState.prev slot (persisted in phase 3 below).
2410 self.binding.retain_handle(h);
2411 prev = h;
2412 continue;
2413 }
2414 // Pack (prev, current) into a tuple handle. Binding returns a
2415 // fresh retain on the packed handle.
2416 let packed = self.binding.pairwise_pack(fn_id, prev, h);
2417 self.commit_emission_verbatim(node_id, packed);
2418 // Advance prev: take retain on h, release old prev.
2419 self.binding.retain_handle(h);
2420 let old_prev = prev;
2421 prev = h;
2422 self.binding.release_handle(old_prev);
2423 emitted += 1;
2424 }
2425 // Persist prev into PairwiseState.prev slot.
2426 {
2427 let mut s = self.lock_state();
2428 let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2429 let stale_slot = scratch.prev;
2430 scratch.prev = prev;
2431 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2432 drop(s);
2433 self.binding.release_handle(stale_slot);
2434 }
2435 }
2436 if emitted == 0 {
2437 self.settle_dirty_resolved(node_id);
2438 }
2439 }
2440
2441 // =================================================================
2442 // Slice C-2: multi-dep combinator operators (D020)
2443 // =================================================================
2444
2445 /// Snapshot all deps' "latest" handle for multi-dep combinators.
2446 /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2447 /// this wave), else `prev_data` (last handle from previous wave).
2448 /// Also returns whether dep[0] (primary) had DATA this wave —
2449 /// needed by `fire_op_with_latest_from`.
2450 fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2451 let s = self.lock_state();
2452 let rec = s.require_node(node_id);
2453 let primary_fired = rec
2454 .dep_records
2455 .first()
2456 .is_some_and(|dr| !dr.data_batch.is_empty());
2457 let latest: SmallVec<[HandleId; 4]> = rec
2458 .dep_records
2459 .iter()
2460 .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2461 .collect();
2462 (latest, primary_fired)
2463 }
2464
2465 /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2466 /// latest handle per dep into a tuple via `pack_tuple`, emits on
2467 /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2468 /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2469 /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2470 /// instead of packing a NO_HANDLE into the tuple.
2471 fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2472 let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2473 {
2474 let mut s = self.lock_state();
2475 s.require_node_mut(node_id).has_fired_once = true;
2476 }
2477 // Post-warmup INVALIDATE guard: a dep may have been invalidated
2478 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2479 if latest.contains(&crate::handle::NO_HANDLE) {
2480 self.settle_dirty_resolved(node_id);
2481 return;
2482 }
2483 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2484 self.commit_emission_verbatim(node_id, tuple_handle);
2485 }
2486
2487 /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2488 /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2489 /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2490 /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2491 /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2492 fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2493 let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2494 let first_fire = {
2495 let mut s = self.lock_state();
2496 let rec = s.require_node_mut(node_id);
2497 let was_first = !rec.has_fired_once;
2498 rec.has_fired_once = true;
2499 was_first
2500 };
2501 // On first fire (gate release), always emit — the first-run gate
2502 // guarantees both deps have values (via prev_data fallback in
2503 // snapshot). On subsequent fires, only emit when primary fires.
2504 if !first_fire && !primary_fired {
2505 // Secondary-only update — no downstream DATA.
2506 self.settle_dirty_resolved(node_id);
2507 return;
2508 }
2509 // Post-warmup INVALIDATE guard: secondary may have been invalidated
2510 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2511 debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2512 if latest[1] == crate::handle::NO_HANDLE {
2513 self.settle_dirty_resolved(node_id);
2514 return;
2515 }
2516 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2517 self.commit_emission_verbatim(node_id, tuple_handle);
2518 }
2519
2520 /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2521 /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2522 /// batch handles are collected, retained, and emitted individually.
2523 fn fire_op_merge(&self, node_id: NodeId) {
2524 // Collect all batch handles from all deps (flat).
2525 let all_handles: Vec<HandleId> = {
2526 let s = self.lock_state();
2527 let rec = s.require_node(node_id);
2528 rec.dep_records
2529 .iter()
2530 .flat_map(|dr| dr.data_batch.iter().copied())
2531 .collect()
2532 };
2533 {
2534 let mut s = self.lock_state();
2535 s.require_node_mut(node_id).has_fired_once = true;
2536 }
2537 if all_handles.is_empty() {
2538 // All deps settled RESOLVED this wave — no DATA to forward.
2539 self.settle_dirty_resolved(node_id);
2540 return;
2541 }
2542 // Emit each handle verbatim. Take a fresh retain per handle
2543 // (independent of the dep batch's retain which gets released at
2544 // wave-end). Matches Filter's discipline for passing inputs.
2545 for &h in &all_handles {
2546 self.binding.retain_handle(h);
2547 self.commit_emission_verbatim(node_id, h);
2548 }
2549 }
2550
2551 // =================================================================
2552 // Slice C-3: flow operators (D024)
2553 // =================================================================
2554
2555 /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2556 /// then self-completes via `Core::complete`. When `count == 0`, the
2557 /// first fire emits zero items then immediately self-completes
2558 /// (D027). Cross-wave counter lives in
2559 /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2560 fn fire_op_take(&self, node_id: NodeId, count: u32) {
2561 use crate::op_state::TakeState;
2562 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2563 // Snapshot current counter; mark fired regardless of input count
2564 // (activation gate already satisfied or partial-mode).
2565 let mut count_emitted = {
2566 let s = self.lock_state();
2567 scratch_ref::<TakeState>(&s, node_id).count_emitted
2568 };
2569 {
2570 let mut s = self.lock_state();
2571 s.require_node_mut(node_id).has_fired_once = true;
2572 }
2573 // Already at quota before any input this wave — self-complete
2574 // immediately. Covers `count == 0` (first-fire short-circuit) and
2575 // any defensive re-entry after the terminal-skip in `fire_operator`
2576 // already guards against double-complete.
2577 if count_emitted >= count {
2578 self.complete(node_id);
2579 return;
2580 }
2581 // Per-input emission loop. Each pass takes a fresh retain for the
2582 // cache slot; data_batch slot's retain is released at wave-end
2583 // rotation independently.
2584 for &h in &inputs {
2585 self.binding.retain_handle(h);
2586 self.commit_emission_verbatim(node_id, h);
2587 count_emitted = count_emitted.saturating_add(1);
2588 if count_emitted >= count {
2589 break;
2590 }
2591 }
2592 // Persist the updated counter.
2593 {
2594 let mut s = self.lock_state();
2595 scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2596 }
2597 // Self-complete if we hit the quota this wave. Upstream COMPLETE
2598 // (terminal == Some(Complete)) without us hitting the count
2599 // propagates via the standard auto-cascade — we don't intercept it.
2600 if count_emitted >= count {
2601 self.complete(node_id);
2602 return;
2603 }
2604 // If upstream is already Errored and we haven't hit count, the
2605 // standard cascade will propagate it. If the wave delivered no
2606 // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2607 // subscribers see the wave close.
2608 if inputs.is_empty() && terminal.is_none() {
2609 self.settle_dirty_resolved(node_id);
2610 }
2611 }
2612
2613 /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2614 /// then forwards the rest. Cross-wave counter lives in
2615 /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2616 /// On a wave where every input is still in the skip window, settles
2617 /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2618 fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2619 use crate::op_state::SkipState;
2620 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2621 let mut count_skipped = {
2622 let s = self.lock_state();
2623 scratch_ref::<SkipState>(&s, node_id).count_skipped
2624 };
2625 {
2626 let mut s = self.lock_state();
2627 s.require_node_mut(node_id).has_fired_once = true;
2628 }
2629 // No early-return on empty inputs: the post-loop `emitted == 0`
2630 // settle handles the empty-inputs case identically to the
2631 // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2632 // with `fire_op_take`).
2633 let mut emitted = 0usize;
2634 for &h in &inputs {
2635 if count_skipped < count {
2636 count_skipped = count_skipped.saturating_add(1);
2637 // Drop this input — the data_batch slot still owns its
2638 // retain (released at wave-end rotation). No emission.
2639 continue;
2640 }
2641 // Past the skip window — emit verbatim. Take a fresh retain
2642 // for the cache slot.
2643 self.binding.retain_handle(h);
2644 self.commit_emission_verbatim(node_id, h);
2645 emitted += 1;
2646 }
2647 // Persist the updated counter.
2648 {
2649 let mut s = self.lock_state();
2650 scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2651 }
2652 if emitted == 0 {
2653 self.settle_dirty_resolved(node_id);
2654 }
2655 }
2656
2657 /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2658 /// holds; on the first `false`, emits any preceding passes from the
2659 /// same batch then self-completes via `Core::complete`. Reuses
2660 /// [`BindingBoundary::predicate_each`] (D029).
2661 fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2662 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2663 {
2664 let mut s = self.lock_state();
2665 s.require_node_mut(node_id).has_fired_once = true;
2666 }
2667 if inputs.is_empty() {
2668 return;
2669 }
2670 // Phase 2: predicate per input.
2671 let pass = self.binding.predicate_each(fn_id, &inputs);
2672 // Slice V2: promoted from debug_assert! — binding contract violation
2673 // should fail loud in release builds too.
2674 assert!(
2675 pass.len() == inputs.len(),
2676 "predicate_each returned {} bools for {} inputs",
2677 pass.len(),
2678 inputs.len()
2679 );
2680 // Phase 3: emit each input until the first false; then
2681 // self-complete. `fire_operator`'s `terminal.is_some()`
2682 // short-circuit gates re-entry after the self-complete cascade
2683 // installs the terminal slot — no extra `done` flag needed.
2684 let mut emitted = 0usize;
2685 let mut first_false_seen = false;
2686 for (i, &h) in inputs.iter().enumerate() {
2687 if pass.get(i).copied().unwrap_or(false) {
2688 self.binding.retain_handle(h);
2689 self.commit_emission_verbatim(node_id, h);
2690 emitted += 1;
2691 } else {
2692 first_false_seen = true;
2693 break;
2694 }
2695 }
2696 if first_false_seen {
2697 self.complete(node_id);
2698 return;
2699 }
2700 if emitted == 0 {
2701 // Whole batch passed but was empty (impossible here since
2702 // inputs.is_empty() returned early above) — defensive only.
2703 self.settle_dirty_resolved(node_id);
2704 }
2705 }
2706
2707 /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2708 /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2709 /// default was registered) then `Complete` on upstream COMPLETE.
2710 /// On upstream ERROR, propagates verbatim. Storage:
2711 /// [`LastState`](super::op_state::LastState).
2712 ///
2713 /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2714 /// wave (`terminal == None`), `fire_op_last` updates the buffered
2715 /// `latest` handle but produces NO downstream wire message —
2716 /// subscribers observe the operator only when upstream
2717 /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2718 /// inputs from the dep's batch are dropped on the floor (their
2719 /// `data_batch` retains release at wave-end rotation
2720 /// independently). Per-wave settlement on intermediate waves is
2721 /// the canonical behavior for terminal-aware operators.
2722 fn fire_op_last(&self, node_id: NodeId) {
2723 use crate::op_state::LastState;
2724 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2725 {
2726 let mut s = self.lock_state();
2727 s.require_node_mut(node_id).has_fired_once = true;
2728 }
2729
2730 // Phase 2: buffer the latest input handle (if any). Retain new,
2731 // release old. data_batch slot's retain is released at wave-end
2732 // rotation independently — the LastState slot keeps its own
2733 // share so the value survives across waves.
2734 if let Some(&new_latest) = inputs.last() {
2735 let prev_latest = {
2736 let mut s = self.lock_state();
2737 let scratch = scratch_mut::<LastState>(&mut s, node_id);
2738 let prev = scratch.latest;
2739 scratch.latest = new_latest;
2740 prev
2741 };
2742 self.binding.retain_handle(new_latest);
2743 if prev_latest != crate::handle::NO_HANDLE {
2744 self.binding.release_handle(prev_latest);
2745 }
2746 }
2747
2748 // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2749 // produce no downstream message — Reduce-style silent
2750 // accumulation. The post-drain auto-resolve sweep is a no-op
2751 // because pending_notify has no entry for Last.
2752 match terminal {
2753 None => {}
2754 Some(TerminalKind::Complete) => {
2755 // Read the live latest + default. If latest != NO_HANDLE,
2756 // emit it. Otherwise, if default != NO_HANDLE, emit default.
2757 // Otherwise, emit only Complete (empty stream, no default).
2758 let (latest, default) = {
2759 let s = self.lock_state();
2760 let scratch = scratch_ref::<LastState>(&s, node_id);
2761 (scratch.latest, scratch.default)
2762 };
2763 let to_emit = if latest != crate::handle::NO_HANDLE {
2764 Some(latest)
2765 } else if default != crate::handle::NO_HANDLE {
2766 Some(default)
2767 } else {
2768 None
2769 };
2770 if let Some(h) = to_emit {
2771 // Emission needs its own retain — the LastState slot
2772 // keeps its share until reset/Drop.
2773 self.binding.retain_handle(h);
2774 self.commit_emission_verbatim(node_id, h);
2775 }
2776 self.complete(node_id);
2777 }
2778 Some(TerminalKind::Error(h)) => {
2779 // Take a fresh share for the error cascade — the
2780 // dep_records[0].terminal slot keeps its own share
2781 // (released by reset_for_fresh_lifecycle / Drop).
2782 self.binding.retain_handle(h);
2783 self.error(node_id, h);
2784 }
2785 }
2786 }
2787
2788 // -----------------------------------------------------------------
2789 // Slice U: control operators — fire_op impls
2790 // -----------------------------------------------------------------
2791
2792 /// Tap — side-effect passthrough. Invoke tap fn on each DATA, then
2793 /// emit each input handle unchanged (zero allocation).
2794 fn fire_op_tap(&self, node_id: NodeId, fn_id: FnId) {
2795 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2796 {
2797 let mut s = self.lock_state();
2798 s.require_node_mut(node_id).has_fired_once = true;
2799 }
2800 if inputs.is_empty() {
2801 if terminal.is_none() {
2802 self.settle_dirty_resolved(node_id);
2803 }
2804 } else {
2805 for &h in &inputs {
2806 self.binding.invoke_tap_fn(fn_id, h);
2807 self.binding.retain_handle(h);
2808 self.commit_emission_verbatim(node_id, h);
2809 }
2810 }
2811 // Terminal forwarding.
2812 match terminal {
2813 None => {}
2814 Some(TerminalKind::Complete) => {
2815 self.binding.invoke_tap_complete_fn(fn_id);
2816 self.complete(node_id);
2817 }
2818 Some(TerminalKind::Error(h)) => {
2819 self.binding.invoke_tap_error_fn(fn_id, h);
2820 self.binding.retain_handle(h);
2821 self.error(node_id, h);
2822 }
2823 }
2824 }
2825
2826 /// TapFirst — one-shot side-effect on first DATA. After the first
2827 /// qualifying DATA, acts as pure passthrough.
2828 fn fire_op_tap_first(&self, node_id: NodeId, fn_id: FnId) {
2829 use crate::op_state::TapFirstState;
2830 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2831 {
2832 let mut s = self.lock_state();
2833 s.require_node_mut(node_id).has_fired_once = true;
2834 }
2835 if inputs.is_empty() {
2836 if terminal.is_none() {
2837 self.settle_dirty_resolved(node_id);
2838 }
2839 } else {
2840 let fired = {
2841 let s = self.lock_state();
2842 scratch_ref::<TapFirstState>(&s, node_id).fired
2843 };
2844 for &h in &inputs {
2845 if !fired {
2846 self.binding.invoke_tap_fn(fn_id, h);
2847 let mut s = self.lock_state();
2848 scratch_mut::<TapFirstState>(&mut s, node_id).fired = true;
2849 }
2850 self.binding.retain_handle(h);
2851 self.commit_emission_verbatim(node_id, h);
2852 }
2853 }
2854 if let Some(TerminalKind::Complete) = terminal {
2855 self.complete(node_id);
2856 } else if let Some(TerminalKind::Error(h)) = terminal {
2857 self.binding.retain_handle(h);
2858 self.error(node_id, h);
2859 }
2860 }
2861
2862 /// Valve — conditional forward. dep[0]=source, dep[1]=control.
2863 /// When control is truthy, forwards source DATA; else RESOLVED.
2864 fn fire_op_valve(&self, node_id: NodeId) {
2865 // Snapshot both deps.
2866 let (src_inputs, src_terminal, ctrl_latest) = {
2867 let s = self.lock_state();
2868 let rec = s.require_node(node_id);
2869 debug_assert!(rec.dep_records.len() == 2, "valve must have exactly 2 deps");
2870 let dr0 = &rec.dep_records[0];
2871 let dr1 = &rec.dep_records[1];
2872 let src_inputs: Vec<HandleId> = dr0.data_batch.iter().copied().collect();
2873 let src_term = dr0.terminal;
2874 // Latest control: last of this wave's batch, or prev_data.
2875 let ctrl = dr1.data_batch.last().copied().unwrap_or(dr1.prev_data);
2876 (src_inputs, src_term, ctrl)
2877 };
2878 {
2879 let mut s = self.lock_state();
2880 s.require_node_mut(node_id).has_fired_once = true;
2881 }
2882
2883 // Source terminal forwarding (D3).
2884 if let Some(TerminalKind::Complete) = src_terminal {
2885 self.complete(node_id);
2886 return;
2887 }
2888 if let Some(TerminalKind::Error(h)) = src_terminal {
2889 self.binding.retain_handle(h);
2890 self.error(node_id, h);
2891 return;
2892 }
2893
2894 // Gate: NO_HANDLE means "gate closed" (control never sent DATA);
2895 // any real handle means "gate open". Proper value-level truthiness
2896 // would require BindingBoundary::is_truthy (deferred — D048).
2897 let gate_open = ctrl_latest != crate::handle::NO_HANDLE;
2898
2899 if !gate_open {
2900 self.settle_dirty_resolved(node_id);
2901 return;
2902 }
2903
2904 if src_inputs.is_empty() {
2905 // Control opened but no source DATA this wave. Re-emit
2906 // prev source value if available.
2907 let prev_src = {
2908 let s = self.lock_state();
2909 s.require_node(node_id).dep_records[0].prev_data
2910 };
2911 if prev_src == crate::handle::NO_HANDLE {
2912 self.settle_dirty_resolved(node_id);
2913 } else {
2914 self.binding.retain_handle(prev_src);
2915 self.commit_emission_verbatim(node_id, prev_src);
2916 }
2917 } else {
2918 for &h in &src_inputs {
2919 self.binding.retain_handle(h);
2920 self.commit_emission_verbatim(node_id, h);
2921 }
2922 }
2923 }
2924
2925 /// Settle — convergence detector. Forwards DATA, counts quiet waves,
2926 /// self-completes when converged.
2927 fn fire_op_settle(&self, node_id: NodeId, quiet_waves: u32, max_waves: Option<u32>) {
2928 use crate::op_state::SettleState;
2929 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2930 {
2931 let mut s = self.lock_state();
2932 s.require_node_mut(node_id).has_fired_once = true;
2933 }
2934
2935 // Terminal forwarding.
2936 if let Some(TerminalKind::Complete) = terminal {
2937 self.complete(node_id);
2938 return;
2939 }
2940 if let Some(TerminalKind::Error(h)) = terminal {
2941 self.binding.retain_handle(h);
2942 self.error(node_id, h);
2943 return;
2944 }
2945
2946 let saw_data = !inputs.is_empty();
2947
2948 // Forward all DATA.
2949 for &h in &inputs {
2950 self.binding.retain_handle(h);
2951 self.commit_emission_verbatim(node_id, h);
2952 }
2953
2954 // Update counters.
2955 let should_complete = {
2956 let mut s = self.lock_state();
2957 let scratch = scratch_mut::<SettleState>(&mut s, node_id);
2958 scratch.wave_count += 1;
2959 if saw_data {
2960 scratch.has_value = true;
2961 scratch.quiet_count = 0;
2962 } else {
2963 scratch.quiet_count += 1;
2964 }
2965 let settled = scratch.has_value && scratch.quiet_count >= quiet_waves;
2966 let exhausted = max_waves.is_some_and(|max| scratch.wave_count >= max);
2967 settled || exhausted
2968 };
2969
2970 if should_complete {
2971 self.complete(node_id);
2972 } else if !saw_data {
2973 self.settle_dirty_resolved(node_id);
2974 }
2975 }
2976
2977 pub(crate) fn deliver_data_to_consumer(
2978 &self,
2979 s: &mut CoreState,
2980 consumer_id: NodeId,
2981 dep_idx: usize,
2982 handle: HandleId,
2983 ) {
2984 // Retain the handle for the batch accumulation slot — each DATA
2985 // handle in `data_batch` owns a retain share, released at wave-end
2986 // rotation in `clear_wave_state`.
2987 self.binding.retain_handle(handle);
2988
2989 let is_dynamic;
2990 let is_state;
2991 let tracked_or_first_fire;
2992 // Slice F audit close (2026-05-07): default-mode pause suppression.
2993 // If the consumer is paused with `PausableMode::Default`, the
2994 // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2995 // pause-window dep deliveries into one fn execution on RESUME.
2996 // Mark `pending_wave` on the pause state instead of adding to
2997 // `pending_fires`. The dep state still advances (the data_batch push
2998 // above is unchanged), and clear_wave_state still rotates the latest
2999 // dep DATA into prev_data — so when the fn ultimately fires on
3000 // RESUME, it sees the consolidated post-pause state.
3001 let suppressed_for_default_pause;
3002 {
3003 let consumer = s.require_node_mut(consumer_id);
3004 consumer.dep_records[dep_idx].data_batch.push(handle);
3005 consumer.dep_records[dep_idx].involved_this_wave = true;
3006 consumer.involved_this_wave = true;
3007 // §10.13 perf (D047): set received_mask bit on first DATA
3008 // delivery for this dep.
3009 if dep_idx < 64 {
3010 consumer.received_mask |= 1u64 << dep_idx;
3011 // §10.3 perf (Slice V1): set involved_mask bit for
3012 // O(1) per-dep involvement query during fire.
3013 consumer.involved_mask |= 1u64 << dep_idx;
3014 }
3015 is_dynamic = consumer.is_dynamic;
3016 is_state = consumer.is_state();
3017 tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
3018 suppressed_for_default_pause = consumer.pause_state.is_paused()
3019 && consumer.pausable == crate::node::PausableMode::Default;
3020 if suppressed_for_default_pause {
3021 consumer.pause_state.mark_pending_wave();
3022 }
3023 }
3024 if suppressed_for_default_pause {
3025 // Default-mode pause: don't add to pending_fires; RESUME will
3026 // schedule one consolidated fire.
3027 return;
3028 }
3029 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3030 // per-thread WaveState. State lock + WaveState borrow are
3031 // independent.
3032 if is_state {
3033 // State nodes don't have deps; unreachable in practice.
3034 } else if is_dynamic {
3035 if tracked_or_first_fire {
3036 with_wave_state(|ws| {
3037 ws.pending_fires.insert(consumer_id);
3038 });
3039 }
3040 } else {
3041 // Derived / Operator / Producer (Producer has no deps so won't
3042 // reach here, but the predicate-based dispatch handles it
3043 // uniformly).
3044 with_wave_state(|ws| {
3045 ws.pending_fires.insert(consumer_id);
3046 });
3047 }
3048 }
3049
3050 // -------------------------------------------------------------------
3051 // Subscriber notification
3052 // -------------------------------------------------------------------
3053
3054 /// Queue a wave-end message for `node_id`'s subscribers.
3055 ///
3056 /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
3057 /// 2026-05-08):** each push for a given node either appends the
3058 /// message to the open batch (if `NodeRecord::subscribers_revision`
3059 /// hasn't advanced since that batch opened — the common case — no
3060 /// extra allocation), or opens a fresh batch with a current sink
3061 /// snapshot frozen at the new revision. A sub installed mid-wave
3062 /// bumps `subscribers_revision`; the next `queue_notify` for the
3063 /// same node observes the bump and starts a new batch that includes
3064 /// the new sub. Pre-subscribe batches retain their original snapshot,
3065 /// so earlier emits flush to their original sink list — the new sub
3066 /// does NOT double-receive them via flush AND handshake replay,
3067 /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
3068 ///
3069 /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
3070 /// Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
3071 /// paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
3072 /// COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
3073 /// flush immediately. START (tier 0) is per-subscription and never
3074 /// transits queue_notify.
3075 pub(crate) fn queue_notify(&self, s: &mut crate::node::St<'_>, node_id: NodeId, msg: Message) {
3076 // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
3077 // wave-content invariant assertion. The tier-3 slot at one node in
3078 // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
3079 // never multiple RESOLVED. Slice G moved equals substitution from
3080 // per-emit to wave-end coalescing; this assert pins that the
3081 // dispatcher itself never queues a violating combination at the
3082 // queue_notify granularity. Resolved arrivals come from:
3083 // 1. The auto-resolve sweep in `drain_and_flush` (gates on
3084 // `!any tier-3` so it can't add to a wave with Data).
3085 // 2. The wave-end equals-substitution pass (rewrites in place,
3086 // doesn't go through queue_notify).
3087 // Both honor R1.3.3.a by construction post-Slice-G.
3088 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
3089 // on per-thread WaveState. The dev-mode invariant assertion
3090 // borrows WaveState briefly and drops before the rest of
3091 // queue_notify proceeds.
3092 #[cfg(debug_assertions)]
3093 if matches!(msg.tier(), 3) {
3094 with_wave_state(|ws| {
3095 if let Some(entry) = ws.pending_notify.get(&node_id) {
3096 // Walk all batches' messages — R1.3.3.a is a per-node
3097 // wave-content invariant, not per-batch (the X4 batches
3098 // are subscriber-snapshot epochs; the protocol-level
3099 // tier-3 invariant spans the whole wave for the node).
3100 let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
3101 let resolved_count = entry
3102 .iter_messages()
3103 .filter(|m| matches!(m, Message::Resolved))
3104 .count();
3105 let incoming_is_data = matches!(msg, Message::Data(_));
3106 if incoming_is_data {
3107 debug_assert!(
3108 resolved_count == 0,
3109 "R1.3.3.a violation at {node_id:?}: queueing Data into a \
3110 wave that already contains Resolved — Slice G should have \
3111 prevented this via wave-end coalescing"
3112 );
3113 } else {
3114 debug_assert!(
3115 !has_data,
3116 "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
3117 wave that already contains Data"
3118 );
3119 debug_assert!(
3120 resolved_count == 0,
3121 "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
3122 wave at one node"
3123 );
3124 }
3125 }
3126 });
3127 }
3128
3129 let buffered_tier = matches!(msg.tier(), 3 | 4);
3130 let cap = s.shared.pause_buffer_cap;
3131
3132 // Pause-routing branch — handles its own retain/release and returns
3133 // before we touch `pending_notify`, so the rec borrow is contained.
3134 {
3135 let rec = s.require_node_mut(node_id);
3136 if rec.subscribers.is_empty() {
3137 return;
3138 }
3139 // Slice F audit close (2026-05-07); amended 2026-05-17 for
3140 // canonical §2.6 R2.6.0 ("Option A"). Pause routing depends on
3141 // mode:
3142 // - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
3143 // - `Default` + STATE node (leaf source — no deps): a state
3144 // node's value is intrinsic, NOT produced by an fn/dep
3145 // settle pipeline. PAUSE/RESUME gating is fn/dep-pipeline-
3146 // scoped only (R2.6.0). A leaf source that holds its own
3147 // pause lock and then self-emits via a direct external
3148 // `down([[DATA, v]])` is pushing OUTSIDE that pipeline, so
3149 // the DATA MUST flush immediately (cache advances now, no
3150 // PAUSE synthesized, nothing replayed on RESUME). Therefore
3151 // Default-mode state nodes do NOT buffer — they fall
3152 // through to the immediate queue path, matching the
3153 // `@graphrefly/pure-ts` reference (only `pausable:
3154 // "resumeAll"` buffers a leaf source's direct `down()`).
3155 // - `Default` + COMPUTE node: suppression happens upstream at
3156 // fn-fire scheduling (see `deliver_data_to_consumer`); no
3157 // outgoing tier-3 is produced from this node while paused,
3158 // so this branch is unreachable for compute-default-paused.
3159 // Fallthrough to the non-paused queue path is fine.
3160 // - `Off`: pause is ignored entirely — tier-3 flushes
3161 // immediately. Fallthrough.
3162 let mode_buffers_tier3 = match rec.pausable {
3163 crate::node::PausableMode::ResumeAll => true,
3164 crate::node::PausableMode::Default | crate::node::PausableMode::Off => false,
3165 };
3166 if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
3167 if let Some(h) = msg.payload_handle() {
3168 self.binding.retain_handle(h);
3169 }
3170 let push_result = rec.pause_state.push_buffered(msg, cap);
3171 for dm in push_result.dropped_msgs {
3172 if let Some(h) = dm.payload_handle() {
3173 self.binding.release_handle(h);
3174 }
3175 }
3176 // R1.3.8.c (Slice F, A3): on first overflow this cycle,
3177 // schedule a synthesized ERROR for wave-end emission.
3178 // `cap` is `Some` here (an overflow can only happen with a
3179 // configured cap), so `unwrap` is safe.
3180 if push_result.first_overflow_this_cycle {
3181 if let Some((dropped_count, lock_held_ns)) =
3182 rec.pause_state.overflow_diagnostic()
3183 {
3184 // Q-beyond Sub-slice 1 (D108, 2026-05-09):
3185 // pending_pause_overflow lives on per-thread WaveState.
3186 with_wave_state(|ws| {
3187 ws.pending_pause_overflow
3188 .push(crate::node::PendingPauseOverflow {
3189 node_id,
3190 dropped_count,
3191 configured_max: cap.unwrap_or(0),
3192 lock_held_ns,
3193 });
3194 });
3195 }
3196 }
3197 return;
3198 }
3199 }
3200
3201 // Non-paused queue path: retain payload handle and queue into
3202 // pending_notify. Released in `flush_notifications` after sinks
3203 // fire.
3204 if let Some(h) = msg.payload_handle() {
3205 self.binding.retain_handle(h);
3206 }
3207 Self::push_into_pending_notify(s, node_id, msg);
3208 }
3209
3210 /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
3211 /// non-paused path. Either appends `msg` to the open batch (if
3212 /// `subscribers_revision` hasn't advanced since it opened — common
3213 /// case, no extra allocation) or opens a fresh batch with a current
3214 /// sink snapshot frozen at the new revision.
3215 ///
3216 /// Borrow discipline: reads `subscribers_revision` and the snapshot
3217 /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
3218 /// keep the two scopes disjoint.
3219 ///
3220 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
3221 /// to per-thread WaveState. The state-side read of
3222 /// `subscribers_revision` / `subscribers` happens before the
3223 /// `with_wave_state` block opens, then the WaveState borrow
3224 /// performs the entry insertion / append. State lock + WaveState
3225 /// borrow remain independent.
3226 ///
3227 /// Lock-discipline assumption: this read of `subscribers_revision`
3228 /// is safe because both the subscribe install path
3229 /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
3230 /// `CoreState`'s mutex when they bump / read the revision —
3231 /// concurrent subscribe/unsubscribe cannot interleave. **If
3232 /// `Core::subscribe` ever moves the sink-install lock-released
3233 /// (mirroring the lock-released drain refactor), the revision read
3234 /// here must re-validate post-borrow — otherwise a fresh batch
3235 /// could open with a stale snapshot.**
3236 fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
3237 let current_rev = s.require_node(node_id).subscribers_revision;
3238 let needs_new_batch = with_wave_state(|ws| {
3239 ws.pending_notify.get(&node_id).is_none_or(|entry| {
3240 entry
3241 .batches
3242 .last()
3243 .is_none_or(|b| b.snapshot_revision != current_rev)
3244 })
3245 });
3246 let sinks_snapshot: SmallVec<[Sink; 1]> = if needs_new_batch {
3247 s.require_node(node_id)
3248 .subscribers
3249 .values()
3250 .cloned()
3251 .collect()
3252 } else {
3253 SmallVec::new()
3254 };
3255 with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
3256 Entry::Vacant(slot) => {
3257 let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
3258 batches.push(PendingBatch {
3259 snapshot_revision: current_rev,
3260 sinks: sinks_snapshot,
3261 messages: smallvec::smallvec![msg],
3262 });
3263 slot.insert(PendingPerNode { batches });
3264 }
3265 Entry::Occupied(mut slot) => {
3266 let entry = slot.get_mut();
3267 if needs_new_batch {
3268 entry.batches.push(PendingBatch {
3269 snapshot_revision: current_rev,
3270 sinks: sinks_snapshot,
3271 messages: smallvec::smallvec![msg],
3272 });
3273 } else {
3274 entry
3275 .batches
3276 .last_mut()
3277 .expect("non-empty by construction (entry exists implies batch exists)")
3278 .messages
3279 .push(msg);
3280 }
3281 }
3282 });
3283 }
3284
3285 /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
3286 /// payload-handle releases owed for `pending_notify` into
3287 /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
3288 /// run **after** the state lock is dropped — see [`Core::run_wave`].
3289 ///
3290 /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
3291 /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
3292 /// here as cross-node tier-then-node collect — phase 1's jobs sit before
3293 /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
3294 /// queue lock-released, multi-node subscribers see all DIRTYs before any
3295 /// settle. Matches TS's drainPhase model without the per-tier queue
3296 /// indirection.
3297 ///
3298 /// Phase ordering:
3299 /// 1 → tier 1 (DIRTY)
3300 /// 2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
3301 /// 3 → tier 5 (COMPLETE/ERROR)
3302 /// 4 → tier 6 (TEARDOWN)
3303 ///
3304 /// Tier 0 (START) is per-subscription (never enters pending_notify) and
3305 /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
3306 /// bypassing pending_notify; both are absent from this enumeration.
3307 ///
3308 /// Within a single phase, per-node insertion order (IndexMap iteration)
3309 /// is preserved — an emit on A before B → A's phase-2 messages flush
3310 /// before B's. Within a single node, message order is preserved.
3311 fn flush_notifications(&self, s: &mut CoreState) {
3312 const PHASES: &[&[u8]] = &[
3313 &[1], // DIRTY
3314 &[3, 4], // DATA/RESOLVED + INVALIDATE
3315 &[5], // COMPLETE/ERROR
3316 &[6], // TEARDOWN
3317 ];
3318 // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
3319 // deferred_handle_releases, and deferred_flush_jobs all live on
3320 // per-thread WaveState. Take pending_notify under the WaveState
3321 // borrow, drop the borrow, run the per-phase loop (no WaveState
3322 // access in the loop body), then re-borrow WaveState at the end
3323 // to push the collected jobs and payload-handle releases.
3324 //
3325 // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
3326 // currently unused inside the per-phase loop — `pending` was
3327 // moved off `s` to WaveState by sub-slice 2, and the per-batch
3328 // sink snapshot is already on the PendingBatch. Kept as a
3329 // parameter to preserve the caller's `let mut s = lock_state();
3330 // self.flush_notifications(&mut s);` invocation shape (caller
3331 // holds the state lock around this call — load-bearing for
3332 // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
3333 // released" marker; the lock guard belongs to the caller and
3334 // is held throughout this function. A future change that adds
3335 // an in-loop state read should remove the discard below;
3336 // removing the parameter would break the caller's ability to
3337 // express the lock-discipline contract at the call site.
3338 let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
3339 // D217-AMEND-2 (2026-05-16): recycle the per-wave `pending_notify`
3340 // map instead of `mem::take`-ing it. `mem::take` installed a
3341 // fresh `IndexMap::default()` every wave → a new `ahash::
3342 // RandomState` (entropy via `gen_hasher_seed`/`from_keys`) PLUS
3343 // RawVec realloc churn (`finish_grow`/`grow_one`) on the next
3344 // wave's `queue_notify`. Empirical attribution
3345 // (`examples/profile_st_emit.rs` + macOS `sample`): ~1250 of
3346 // ~4767 hot-path samples — the dominant §7 floor tax (D217
3347 // lever-1 "slab store" falsified; `require_node` was minor).
3348 // Fix: swap the live map with a persistent spare so the next
3349 // wave fills a capacity-retained, fixed-seed map; process this
3350 // wave's full map from the spare slot and `.clear()` it (retain
3351 // capacity + hasher; no drop, no realloc, no reseed). Zero
3352 // `IndexMap::default()` after thread init.
3353 //
3354 // The jobs-building loop now runs INSIDE the single WaveState
3355 // borrow. This is sound: the loop is pure (iteration +
3356 // `Arc::clone` + `Vec` collect; no re-entrant `with_wave_state`
3357 // / `lock_state`), and the caller holds the CoreState lock
3358 // throughout regardless — so R1.3.5.a per-tier
3359 // handshake-vs-flush ordering is unchanged. Refcount discipline
3360 // is unchanged: payload-handle releases are still collected into
3361 // `deferred_handle_releases` (released post-lock-drop by
3362 // `BatchGuard::drop`); `.clear()` runs the same element drops as
3363 // the old map-drop and the `PendingPerNode`/`Message` payloads
3364 // carry no refcount-releasing `Drop`.
3365 with_wave_state(|ws| {
3366 core::mem::swap(&mut ws.pending_notify, &mut ws.pending_notify_recycle);
3367 // ws.pending_notify = empty spare (next wave fills it)
3368 // ws.pending_notify_recycle = THIS wave's full map (below)
3369 let mut jobs: DeferredJobs = Vec::new();
3370 let mut releases: Vec<HandleId> = Vec::new();
3371 {
3372 let full = &ws.pending_notify_recycle;
3373 for &phase_tiers in PHASES {
3374 for (_node_id, entry) in full {
3375 // Slice X4 / D2: iterate batches in arrival
3376 // order. Each batch carries its own sink
3377 // snapshot frozen at open-time; a batch's
3378 // messages flush to ITS sinks only. Within a
3379 // single (phase, node), batches stay in arrival
3380 // order so emit-order semantics are preserved.
3381 for batch in &entry.batches {
3382 if batch.sinks.is_empty() {
3383 continue;
3384 }
3385 let phase_msgs: Vec<Message> = batch
3386 .messages
3387 .iter()
3388 .copied()
3389 .filter(|m| phase_tiers.contains(&m.tier()))
3390 .collect();
3391 if phase_msgs.is_empty() {
3392 continue;
3393 }
3394 let sinks_clone: Vec<Sink> =
3395 batch.sinks.iter().map(Rc::clone).collect();
3396 jobs.push((sinks_clone, phase_msgs));
3397 }
3398 }
3399 }
3400 // Refcount release balances the retain done in
3401 // `queue_notify` for every payload-bearing message that
3402 // landed in pending_notify (across ALL batches per
3403 // node); deferred to post-lock-drop so the binding's
3404 // release path can't re-enter Core under our lock.
3405 for entry in full.values() {
3406 for msg in entry.iter_messages() {
3407 if let Some(h) = msg.payload_handle() {
3408 releases.push(h);
3409 }
3410 }
3411 }
3412 }
3413 ws.deferred_flush_jobs.append(&mut jobs);
3414 ws.deferred_handle_releases.append(&mut releases);
3415 // Retain capacity + the existing ahash seed for next wave's
3416 // swap-in. No drop, no realloc, no reseed.
3417 ws.pending_notify_recycle.clear();
3418 });
3419 }
3420
3421 /// Take the deferred sink-fire jobs, payload-handle releases,
3422 /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
3423 /// Callers pair this with `drop(state_guard)` and a subsequent
3424 /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
3425 /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
3426 /// Q2(b) eager wipe_ctx fires lock-released.
3427 ///
3428 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
3429 /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
3430 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
3431 /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
3432 /// WaveState. The `_s: &mut CoreState` parameter is now unused but
3433 /// kept to preserve the call-site lock-discipline ordering (caller
3434 /// holds the state lock around this call to interleave with prior
3435 /// `clear_wave_state` per-NodeRecord work).
3436 pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
3437 (
3438 std::mem::take(&mut ws.deferred_flush_jobs),
3439 std::mem::take(&mut ws.deferred_handle_releases),
3440 std::mem::take(&mut ws.deferred_cleanup_hooks),
3441 std::mem::take(&mut ws.pending_wipes),
3442 )
3443 }
3444
3445 /// Fire deferred sink-fire jobs in collected order, then release the
3446 /// payload handles owed for messages that landed in `pending_notify`
3447 /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
3448 /// hooks. All three phases run lock-released so:
3449 /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
3450 /// state lock cleanly and run their own nested wave.
3451 /// - The binding's `release_handle` path can't deadlock against a
3452 /// binding-side mutex held by Core.
3453 /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
3454 /// may safely re-enter Core for unrelated nodes.
3455 ///
3456 /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
3457 /// is wrapped in `catch_unwind` so a single binding panic doesn't
3458 /// short-circuit the per-wave drain. All queued cleanup attempts run;
3459 /// if any panicked, the LAST panic re-raises after the loop completes
3460 /// (preserving wave-end discipline while still surfacing failures).
3461 /// Per D060, Core stays panic-naive about user code — bindings own
3462 /// their host-language panic policy inside `cleanup_for`; this
3463 /// `catch_unwind` is purely about drain-don't-short-circuit.
3464 pub(crate) fn fire_deferred(
3465 &self,
3466 jobs: DeferredJobs,
3467 releases: Vec<HandleId>,
3468 cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
3469 pending_wipes: Vec<crate::handle::NodeId>,
3470 ) {
3471 // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
3472 // `catch_unwind` so a panicking sink doesn't unwind out of
3473 // `fire_deferred` and drop the queued `releases` +
3474 // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
3475 // handshake-fire discipline. Without this guard, a sink panic
3476 // here would silently leak handle retains AND silently drop
3477 // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
3478 // we re-raise the last panic at the end after running every
3479 // queued fire — drain ordering is preserved.
3480 let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
3481 for (sinks, msgs) in jobs {
3482 for sink in &sinks {
3483 let sink = sink.clone();
3484 let msgs_ref = &msgs;
3485 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3486 sink(msgs_ref);
3487 }));
3488 if let Err(payload) = result {
3489 last_panic = Some(payload);
3490 }
3491 }
3492 }
3493 for h in releases {
3494 self.binding.release_handle(h);
3495 }
3496 // Slice E2 (D060): drain cleanup hooks with per-item panic
3497 // isolation so the loop always completes. AssertUnwindSafe is
3498 // safe here because we don't rely on logical state being valid
3499 // post-panic — the panic propagates anyway after the drain ends.
3500 for (node_id, trigger) in cleanup_hooks {
3501 let binding = &self.binding;
3502 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3503 binding.cleanup_for(node_id, trigger);
3504 }));
3505 if let Err(payload) = result {
3506 last_panic = Some(payload);
3507 }
3508 }
3509 // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
3510 // same per-item panic isolation. Fires AFTER cleanup hooks so a
3511 // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
3512 // fires in the same wave) sees pre-wipe binding state if it
3513 // landed in the same wave as the terminal cascade. Mutually
3514 // exclusive with `Subscription::Drop`'s direct-fire site, but
3515 // even concurrent fires are idempotent (binding's `wipe_ctx`
3516 // calls `HashMap::remove` which is a no-op on absent keys).
3517 for node_id in pending_wipes {
3518 let binding = &self.binding;
3519 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3520 binding.wipe_ctx(node_id);
3521 }));
3522 if let Err(payload) = result {
3523 last_panic = Some(payload);
3524 }
3525 }
3526 if let Some(payload) = last_panic {
3527 std::panic::resume_unwind(payload);
3528 }
3529 }
3530
3531 // -------------------------------------------------------------------
3532 // User-facing batch — coalesce multiple emits into one wave
3533 // -------------------------------------------------------------------
3534
3535 /// Coalesce multiple emissions into a single wave. Every `emit` /
3536 /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
3537 /// queues its downstream work; the wave drains when `f` returns.
3538 ///
3539 /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
3540 /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
3541 /// — repeated emits on the same node coalesce into a single multi-message
3542 /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
3543 /// per emit, all delivered together in the per-node phase-2 pass).
3544 ///
3545 /// Nested `batch()` calls share the outer wave; only the outermost call
3546 /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3547 /// engine's own `in_tick` re-entrance) compose with this method
3548 /// transparently — they observe `in_tick = true` and skip drain just
3549 /// like nested `batch()`.
3550 ///
3551 /// On panic inside `f`, the `BatchGuard` returned by the internal
3552 /// `begin_batch` call drops normally and discards pending tier-3+ work
3553 /// (subscribers do not observe the half-built wave). See
3554 /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3555 /// over the scope boundary.
3556 pub fn batch<F>(&self, f: F)
3557 where
3558 F: FnOnce(),
3559 {
3560 let _guard = self.begin_batch();
3561 f();
3562 }
3563
3564 /// RAII batch handle — opens a wave when constructed, drains on drop.
3565 ///
3566 /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3567 /// boundary so callers can compose batches with non-`FnOnce` control
3568 /// flow (e.g. async-state-machine code paths, or splitting setup and
3569 /// drain across helper functions).
3570 ///
3571 /// ```
3572 /// use graphrefly_core::{Core, BindingBoundary, NodeRegistration, NodeOpts,
3573 /// HandleId, NodeId, FnId, FnResult, DepBatch};
3574 /// use std::sync::Arc;
3575 ///
3576 /// struct Stub;
3577 /// impl BindingBoundary for Stub {
3578 /// fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3579 /// FnResult::Noop { tracked: None }
3580 /// }
3581 /// fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3582 /// fn release_handle(&self, _: HandleId) {}
3583 /// }
3584 ///
3585 /// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3586 /// let state_a = core.register(NodeRegistration {
3587 /// deps: vec![], fn_or_op: None,
3588 /// opts: NodeOpts { initial: HandleId::new(1), ..Default::default() },
3589 /// }).unwrap();
3590 /// let state_b = core.register(NodeRegistration {
3591 /// deps: vec![], fn_or_op: None,
3592 /// opts: NodeOpts { initial: HandleId::new(2), ..Default::default() },
3593 /// }).unwrap();
3594 ///
3595 /// let g = core.begin_batch();
3596 /// core.emit(state_a, HandleId::new(10));
3597 /// core.emit(state_b, HandleId::new(20));
3598 /// drop(g); // wave drains here
3599 /// ```
3600 ///
3601 /// Like the closure form, nested `begin_batch` calls share the outer
3602 /// wave (only the outermost guard drains).
3603 ///
3604 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3605 pub fn begin_batch(&self) -> BatchGuard<'_> {
3606 // D246/S2c: single-owner ⇒ a `Core` is driven by exactly one
3607 // thread, so there is no cross-thread wave serialization to
3608 // acquire (the §7 group/global wave-locks are deleted) and no
3609 // shard to route to (single shard always).
3610 self.begin_batch_with_guards()
3611 }
3612
3613 /// Begin a batch for a wave seeded at `seed`. Historically this
3614 /// acquired the per-partition `wave_owner` `ReentrantMutex`es for
3615 /// every partition transitively touched from `seed` (the Slice Y1
3616 /// parallelism win). **S2c/D248 deleted that machinery:** `Core` is
3617 /// single-owner `!Send + !Sync`, so a wave is one uninterrupted
3618 /// owner-side drain with nothing to lock. This now delegates
3619 /// directly to [`Self::begin_batch_with_guards`] (the all-`None`
3620 /// single-owner floor acquires nothing); the `seed` parameter is
3621 /// retained for the declared-group identity routing only.
3622 /// Cross-`Core` parallelism is host-native via independent
3623 /// per-worker Cores (actor model), not per-partition locks. D274
3624 /// (2026-05-21) deleted the always-`Ok` `try_begin_batch_for`
3625 /// shim that this used to delegate through.
3626 ///
3627 /// Slice Y1 / Phase E (2026-05-08); infallible-since S2c (D246);
3628 /// shim-collapsed by D274 (2026-05-21).
3629 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3630 pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard<'_> {
3631 // D246/S2c: single-owner ⇒ no cross-thread wave serialization
3632 // and no shard routing. `seed` is unused now — the §7 touched-
3633 // group walk is deleted. D274 (2026-05-21): direct call to
3634 // `begin_batch_with_guards`; the always-`Ok` `try_begin_batch_for`
3635 // shim was deleted.
3636 let _ = seed;
3637 self.begin_batch_with_guards()
3638 }
3639
3640 // D274 (2026-05-21): `try_begin_batch_for` was deleted (was an
3641 // always-`Ok` shim). `begin_batch_for` calls `begin_batch_with_guards`
3642 // directly now.
3643
3644 /// Is this thread currently inside an owning wave on this Core?
3645 /// Reads the one-Core-per-thread [`IN_TICK_OWNED`] slot (D252) —
3646 /// `true` iff the slot holds this Core's `generation`. Read on the
3647 /// wave-owner thread (e.g. by `commit_emission` to decide cache-
3648 /// snapshot taking). `#[must_use]`: a discarded result silently
3649 /// loses the ownership/nesting decision (a classic predicate-misuse
3650 /// bug).
3651 #[must_use]
3652 pub(crate) fn in_tick(&self) -> bool {
3653 IN_TICK_OWNED.with(|s| s.get() == self.generation)
3654 }
3655
3656 /// Claim wave ownership for this Core on this thread (D252 "one
3657 /// Core per OS thread" hard invariant). Returns `true` iff this call
3658 /// is the outermost entry (slot was `0`) — i.e. `owns_tick`; `false`
3659 /// for nested same-Core re-entry (slot already holds our
3660 /// `generation`). **Panics fail-loud** if the slot holds any *other*
3661 /// nonzero generation — that is the D252-forbidden cross-Core
3662 /// owner-side nesting on a single thread (one OS thread mid-wave on
3663 /// Core-A entering a wave on Core-B). Under D248 single-owner Core
3664 /// the only call path that could produce it is a `DeferFn` capturing
3665 /// & driving a *second* `&Core`; no in-tree consumer does this, and
3666 /// adding one would violate the actor-model "one worker = one Core"
3667 /// framing — surfaced loudly here rather than silently masking the
3668 /// foreign Core's ownership.
3669 fn claim_in_tick(&self) -> bool {
3670 IN_TICK_OWNED.with(|s| {
3671 let cur = s.get();
3672 if cur == 0 {
3673 s.set(self.generation);
3674 true
3675 } else if cur == self.generation {
3676 false
3677 } else {
3678 // D252 hard invariant: one Core per OS thread. A nonzero
3679 // mismatch means a foreign Core's wave is live on this
3680 // thread — structurally forbidden under D248 single-
3681 // owner (a `DeferFn` driving a second `&Core` on the
3682 // same owner thread). Panic before nesting silently.
3683 //
3684 // Audience: **Rust-dev clarity only.** The
3685 // `@graphrefly/native` JS consumer does NOT see this
3686 // text — `core_actor.rs` M1's `catch_unwind(...)`
3687 // swallows the panic value before it crosses napi (JS
3688 // callers observe a sync_channel disconnect). The
3689 // polished message reaches Rust's panic hook → stderr
3690 // only. The BenchCore line below is preserved as a
3691 // hint for future readers of the panic-hook log; a
3692 // JS-friendly framing is aspirational pending a
3693 // panic→JS bridge slice (NOT committed — see
3694 // porting-deferred §panic-bridge). D258 (S7,
3695 // 2026-05-20) / D262/P4 amend / AMEND-D 2026-05-21 /
3696 // D277 (2026-05-22) text-shape consolidation.
3697 panic!(
3698 "GraphReFly invariant violated — cross-Core wave nesting \
3699 on a single OS thread (this Core's generation {self_gen}, \
3700 observed foreign generation {cur}). One Core per OS \
3701 thread (D248/D252). \
3702 \n\
3703 Rust callers: a `DeferFn` or owner-side seam appears to \
3704 be driving a *second* `&Core` mid-wave; the substrate \
3705 does not support cross-Core same-thread nesting. \
3706 (BenchCore: each instance MUST own its dedicated actor \
3707 thread — the S6/D255 invariant.) \
3708 \n\
3709 Internal reference: `docs/rust-port-decisions.md` \
3710 D248/D252/D255/D258.",
3711 self_gen = self.generation,
3712 );
3713 }
3714 })
3715 }
3716
3717 /// Release wave ownership for this Core on this thread. Called by the
3718 /// owning [`BatchGuard::drop`] only — after the `!owns_tick`
3719 /// early-return, so a nested guard never releases — explicitly at
3720 /// each of the three exit points, always AFTER the wave drain +
3721 /// WaveState cleanup and BEFORE `fire_deferred` (so a re-entrant sink
3722 /// emit runs as a fresh owning wave): (1) the closure-body-panic
3723 /// branch, (2) the drain-phase-panic `catch_unwind` arm (before
3724 /// `resume_unwind`), (3) the success path's locked cleanup block.
3725 /// Clears the [`IN_TICK_OWNED`] slot back to `0` (D252). Released
3726 /// exactly once per (Core, wave) on this thread; idempotent (a
3727 /// double-clear of `0` is a no-op).
3728 fn clear_in_tick(&self) {
3729 IN_TICK_OWNED.with(|s| {
3730 debug_assert!(
3731 s.get() == 0 || s.get() == self.generation,
3732 "BatchGuard::clear_in_tick: slot holds foreign \
3733 generation {observed} (this Core: {self_gen}) — \
3734 D252 invariant violated",
3735 observed = s.get(),
3736 self_gen = self.generation,
3737 );
3738 s.set(0);
3739 });
3740 }
3741
3742 /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`].
3743 /// D246/S2c: single-owner ⇒ no wave-owner / shard guards to carry.
3744 fn begin_batch_with_guards(&self) -> BatchGuard<'_> {
3745 // Claim wave ownership in the one-Core-per-OS-thread
3746 // [`IN_TICK_OWNED`] slot (D252; see its doc for the same-Core
3747 // nested-re-entry semantics and the cross-Core panic invariant)
3748 // — no state lock needed, since `in_tick` has no cross-thread
3749 // read requirement.
3750 let owns_tick = self.claim_in_tick();
3751 // D1 patch (2026-05-09): defensive wave-start clear of the
3752 // per-thread Slice G tier3 tracker on outermost owning entry.
3753 // The thread-local is cleared at outermost BatchGuard drop on
3754 // both success + panic paths; this start-clear is belt-and-
3755 // suspenders against panic paths that bypass Drop (catch_unwind
3756 // can interleave with thread reuse — e.g. cargo's test-runner
3757 // thread pool — and propagate stale entries from a prior
3758 // panicked test's wave that didn't fully unwind through
3759 // BatchGuard::drop).
3760 if owns_tick {
3761 tier3_clear();
3762 // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3763 // clear of WaveState's non-retain-holding fields. Mirrors the
3764 // tier3 defensive-clear above. Retain-holding fields
3765 // (wave_cache_snapshots / deferred_handle_releases) MUST be
3766 // empty here — outermost BatchGuard::drop drains them on both
3767 // success + panic paths.
3768 wave_state_clear_outermost();
3769 }
3770 BatchGuard {
3771 core: self,
3772 owns_tick,
3773 _not_send: std::marker::PhantomData,
3774 }
3775 }
3776}
3777
3778/// RAII guard returned by [`Core::begin_batch`].
3779///
3780/// While alive, suppresses per-emit wave drains — multiple `emit` /
3781/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3782/// wave. On drop:
3783/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3784/// in-tick).
3785/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3786/// the in-tick flag): silently no-ops.
3787///
3788/// On thread panic during the closure body, the drop path discards pending
3789/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3790/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3791/// State-node cache writes that already executed inside the closure are
3792/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3793/// panic value. The atomicity guarantee covers both sink-observability and
3794/// cache state.
3795///
3796/// # Thread safety
3797///
3798/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3799/// one-Core-per-OS-thread `in_tick` ownership slot (D252) on the
3800/// calling thread; sending the guard to another thread and dropping it
3801/// there would clear `in_tick` against the wrong thread's slot,
3802/// breaking the "I own the wave scope" semantic. D246/S2c: single-owner
3803/// ⇒ the §7 per-partition `wave_owner` re-entrant mutex(es) are
3804/// deleted; `!Send` is now enforced solely by the
3805/// `PhantomData<*const ()>` marker.
3806///
3807/// ```compile_fail
3808/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3809/// use std::sync::Arc;
3810///
3811/// struct Stub;
3812/// impl BindingBoundary for Stub {
3813/// fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3814/// FnResult::Noop { tracked: None }
3815/// }
3816/// fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3817/// fn release_handle(&self, _: HandleId) {}
3818/// }
3819/// fn requires_send<T: Send>(_: T) {}
3820/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3821/// let guard = core.begin_batch();
3822/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3823/// ```
3824#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3825pub struct BatchGuard<'a> {
3826 // S2b (D223): borrows `&'a Core` — `Core` is no longer `Clone`
3827 // (owned-by-value, relocates between workers). The guard lives
3828 // entirely within the wave scope of the `&self` that produced it
3829 // (`begin_batch*` takes `&self`), so `self` strictly outlives the
3830 // guard — identical to S1's `FiringGuard<'a>`. `!Send` via
3831 // `_not_send`.
3832 core: &'a Core,
3833 owns_tick: bool,
3834 /// D246/S2c: single-owner ⇒ no per-partition wave-owner guards and
3835 /// no ambient shard-key guard (both were shared-Core-era §7
3836 /// machinery, now deleted). `BatchGuard` stays `!Send` purely via
3837 /// this `PhantomData<*const ()>` — a wave guard is wave-scoped to
3838 /// the one owner thread and must not cross threads.
3839 _not_send: std::marker::PhantomData<*const ()>,
3840}
3841
3842impl BatchGuard<'_> {
3843 /// Panic-discard cleanup for the owning guard: drop pending wave
3844 /// work, release queued payload + handle retains lock-released,
3845 /// restore pre-wave cache snapshots, clear per-thread `WaveState` +
3846 /// the Slice-G tier3 tracker, and discard deferred producer ops.
3847 ///
3848 /// Shared by BOTH panic origins so a drain-phase fn/sink panic gets
3849 /// the identical `BatchGuard` atomicity guarantee as a closure-body
3850 /// panic: (1) the `std::thread::panicking()` branch (panic propagated
3851 /// from the wave's *closure body* — drop runs during that unwind),
3852 /// and (2) the success-path `catch_unwind` around `drain_and_flush()`
3853 /// (a fn/sink panic that escaped the inner per-call `catch_unwind`
3854 /// isolation while drop was NOT already unwinding). /qa D047.
3855 ///
3856 /// Does NOT release `in_tick` — each `BatchGuard::drop` exit path
3857 /// calls `clear_in_tick()` explicitly, after this cleanup and before
3858 /// `fire_deferred` (so a re-entrant sink emit runs as a fresh owning
3859 /// wave).
3860 fn discard_wave_cleanup(&self) {
3861 let (
3862 pending,
3863 pending_recycle,
3864 deferred_releases,
3865 restored_releases,
3866 restored_terminal_releases,
3867 ) = {
3868 let mut s = self.core.lock_state();
3869 // WaveState borrowed alongside state for panic-discard
3870 // cleanup. The WaveState borrow is per-thread, independent of
3871 // state. `in_tick` is the one-Core-per-OS-thread
3872 // [`IN_TICK_OWNED`] slot (D252), released separately by the
3873 // explicit `clear_in_tick` on each
3874 // exit path; this method only drains/cleans the per-thread
3875 // WaveState retain-fields.
3876 with_wave_state(|ws| {
3877 let pending = std::mem::take(&mut ws.pending_notify);
3878 // D217-AMEND-2 / QA: `pending_notify_recycle` is the
3879 // ONE retain-capable WaveState field whose retains live
3880 // in a *persistent* slot (not an owned local that drops
3881 // on unwind). On the success path it is empty here
3882 // (`flush_notifications` cleared it). But if a wave
3883 // panic-discards mid-`flush_notifications` AFTER the
3884 // swap but before `.clear()`, this slot holds the full
3885 // wave's payload-retaining map while `pending_notify` is
3886 // the empty spare. Drain it symmetrically so the same
3887 // "every retain field released on the panic path"
3888 // invariant the sibling fields uphold also covers
3889 // recycle (closes the leak + the stale-entry-injected-
3890 // into-next-wave hazard; success path: this is a no-op
3891 // take of an already-empty map).
3892 let pending_recycle = std::mem::take(&mut ws.pending_notify_recycle);
3893 let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3894 ws.pending_fires.clear();
3895 let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3896 // D291: restore terminal slots that transitioned during
3897 // this wave (closes Case 5 — R4.3.2 status-snapshot
3898 // completeness). Returns ERROR-tier handles for
3899 // lock-released release; the terminal slots own those
3900 // retains pre-restore, so restore_terminal transfers
3901 // ownership into this vec.
3902 let restored_terminal = self.core.restore_wave_terminal_snapshots(&mut s, ws);
3903 // clear_wave_state pushes batch-handle releases into
3904 // ws.deferred_handle_releases, so take ws's queue AFTER
3905 // the clear.
3906 s.clear_wave_state(ws);
3907 // Step 2a (D220-EXEC): defensive `currently_firing`
3908 // clear, relocated here from `CoreState::clear_wave_state`
3909 // (the field moved to the separate `CoreShared` region;
3910 // `St`'s `.shared` reaches it, a `&mut CoreState` can't).
3911 // Same wave-end point; `FiringGuard` RAII already
3912 // balances push/pop — this is the belt-and-suspenders
3913 // net for a future guard-bypassing path.
3914 s.shared.currently_firing.clear();
3915 ws.clear_wave_state();
3916 let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
3917 // Slice E2 (D061): panic-discard wave drops queued
3918 // OnInvalidate cleanup hooks SILENTLY. Bindings using
3919 // OnInvalidate for external-resource cleanup MUST
3920 // idempotent-cleanup at process exit / next successful
3921 // invalidate. Mirrors A3 `pending_pause_overflow`
3922 // panic-discard precedent.
3923 let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
3924 std::mem::take(&mut ws.deferred_cleanup_hooks);
3925 // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
3926 // for the eager-wipe queue. A panic-discarded wave drops
3927 // queued `wipe_ctx` fires silently; the binding-side
3928 // `NodeCtxState` entry remains until the next successful
3929 // terminate-with-no-subs cycle (or until `Core` drops).
3930 // This mirrors D061's external-resource-cleanup gap and
3931 // is documented similarly.
3932 let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
3933 (
3934 pending,
3935 pending_recycle,
3936 deferred_releases,
3937 restored,
3938 restored_terminal,
3939 )
3940 })
3941 };
3942 // Lock dropped — release retains lock-released so the binding
3943 // can't deadlock against an internal binding mutex.
3944 for entry in pending.values() {
3945 for msg in entry.iter_messages() {
3946 if let Some(h) = msg.payload_handle() {
3947 self.core.binding.release_handle(h);
3948 }
3949 }
3950 }
3951 // Symmetric with the `pending` loop above (D217-AMEND-2 / QA):
3952 // releases the full-map retains stranded in the recycle slot by
3953 // a panic between `flush_notifications`'s swap and clear. Empty
3954 // (no-op) on every non-panic path.
3955 for entry in pending_recycle.values() {
3956 for msg in entry.iter_messages() {
3957 if let Some(h) = msg.payload_handle() {
3958 self.core.binding.release_handle(h);
3959 }
3960 }
3961 }
3962 for h in deferred_releases {
3963 self.core.binding.release_handle(h);
3964 }
3965 for h in restored_releases {
3966 self.core.binding.release_handle(h);
3967 }
3968 // D291: release ERROR-tier handles transferred out of restored
3969 // terminal slots. Same lock-released discipline as
3970 // `restored_releases` above.
3971 for h in restored_terminal_releases {
3972 self.core.binding.release_handle(h);
3973 }
3974 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3975 // tracker on outermost wave-end (panic-discard path). The
3976 // thread-local outlives the BatchGuard otherwise — cargo's
3977 // thread reuse across tests would propagate stale entries.
3978 tier3_clear();
3979 // §7 (D208–D211): the panic-path "discard deferred producer
3980 // ops" block is DELETED. There is no `deferred_producer_ops`
3981 // queue — producer ops execute immediately
3982 // (`Core::push_deferred_producer_op`), so on a panic-discard
3983 // there is nothing queued to release/drop.
3984 }
3985}
3986
3987/// D260: cap on the outer wave-end drain-to-quiescence loop. Realistic
3988/// legitimate cascades stay ≤3 passes; 32 gives 10× headroom and
3989/// panics loudly on a real emit-loop. See [`Drop for BatchGuard`] body.
3990const D260_MAX_REDRAIN_PASSES: u32 = 32;
3991
3992impl Drop for BatchGuard<'_> {
3993 fn drop(&mut self) {
3994 if !self.owns_tick {
3995 // Nested / non-owning guard: never claimed ownership, so it
3996 // must never release it. The owning guard's RAII releaser
3997 // (below) is the single clear site.
3998 return;
3999 }
4000 // Wave-ownership (`in_tick`) release discipline. `clear_in_tick`
4001 // must run AFTER the wave's drain + WaveState cleanup but BEFORE
4002 // `fire_deferred` (sinks), on every exit path:
4003 //
4004 // - **Before `fire_deferred` (load-bearing):** a sink re-entering
4005 // `Core::emit` / `complete` from a flush callback must run as a
4006 // fresh OWNING wave (so its own emissions drain + deliver). If
4007 // `in_tick` were still owned during `fire_deferred`, that
4008 // re-entrant emit would be a non-owning no-op and its data
4009 // silently lost (regression caught by
4010 // `lock_discipline::sink_can_reenter_core_via_emit`). This is
4011 // why each path clears explicitly at the right point — NOT via
4012 // an end-of-`drop` RAII guard (which would clear *after*
4013 // `fire_deferred`).
4014 // - **/qa hardening (D047):** a fn/sink panic in the drain phase
4015 // can escape the per-call `catch_unwind` isolation (e.g. a
4016 // derived fn panicking when fired). Drop is NOT already
4017 // unwinding, so it would otherwise skip BOTH the WaveState
4018 // drain (→ next owning wave trips `wave_state_clear_outermost`)
4019 // AND the `in_tick` clear (pre-D047 the explicit clear had this
4020 // same window). Catching the drain panic, running the shared
4021 // discard cleanup + `clear_in_tick`, then `resume_unwind` gives
4022 // a drain-phase panic the identical atomicity as a
4023 // closure-body panic.
4024 if std::thread::panicking() {
4025 // Closure-body panic — drop runs during that unwind. Discard
4026 // pending wave work (don't fire sinks mid-unwind — a sink
4027 // panic then aborts the process), release queued retains,
4028 // restore caches, then release ownership.
4029 self.discard_wave_cleanup();
4030 self.core.clear_in_tick();
4031 return;
4032 }
4033 // D260 / S7 (2026-05-20): wave-end drain-to-quiescence past
4034 // `fire_deferred`. The S2b/D232-AMEND contract is "the drain
4035 // loop applies mailbox/DeferQueue ops **in-wave, immediately**";
4036 // pre-D260 the BatchGuard ended after one `fire_deferred` pass,
4037 // so posts made by sinks firing **inside** `fire_deferred`
4038 // (post-`drain_and_flush`-exit) were stranded until the next
4039 // external wave entry. D260 completes the D232-AMEND promise:
4040 // loops the full drain → extract → clear-in_tick → fire_deferred
4041 // → release cycle until BOTH `mailbox` AND `deferred` quiesce.
4042 //
4043 // **Primary same-thread case (the actual bug surface).** Post-
4044 // S2b/S2c, producer-build sinks (e.g. `graphrefly_operators::
4045 // buffer::buffer`'s `notifier_sink`, the reactive-log `view`'s
4046 // internal sink) capture `MailboxEmitter` / `SinkEmitter` and
4047 // emit via `mailbox.post_emit(...)` instead of the pre-S2b
4048 // pre-S2b direct `Core::emit` (no mailbox). The post is supposed
4049 // to be drained by the wave's `drain_and_flush` loop (top-of-
4050 // iteration `is_runnable()` check), but `fire_deferred` runs
4051 // AFTER `drain_and_flush` exits. Same-thread same-wave: a
4052 // producer-build sink in `fire_deferred` posts to mailbox →
4053 // stranded → 35 parity failures (buffer / stratify / higher-
4054 // order / zip / control / messaging — all uniform "emissions
4055 // lost"). The cross-thread autonomous-timer-task case (D227/
4056 // D230) is the secondary motivation for `MailboxEmitter`'s
4057 // `Send + Sync` shape, but the in-tree bug surface was the
4058 // same-thread sink case D260 plugs.
4059 //
4060 // **Why iteration not nested recursion.** Pre-S2b, sinks
4061 // captured `Core` directly and synchronous emit ran a
4062 // *nested wave* per emit (recursive RAII; depth-first). D260's
4063 // iteration at the wave-end frontier coalesces all post-
4064 // `fire_deferred` mailbox ops into one outer wave (breadth-
4065 // first at the boundary; fewer nested-wave entries; identical
4066 // quiescence semantics).
4067 //
4068 // **Canonical timing convergence (A3, user-locked 2026-05-20).**
4069 // Rust IS the canonical for this drain-to-quiescence shape;
4070 // TS/PY MUST converge if/when they expose an equivalent
4071 // mailbox-posting sink seam. Pure-ts today implements the same
4072 // observable behavior via a direct-call sink path (no mailbox
4073 // indirection → no "stranded post" hazard → naturally quiescent
4074 // at wave end); cross-arm parity scenarios (buffer, view(slice),
4075 // stratify, ...) verify the observable agreement empirically.
4076 //
4077 // **Bounded-iteration:** D260's outer loop is capped at
4078 // `D260_MAX_REDRAIN_PASSES` (32) — realistic legitimate
4079 // cascades stay ≤3 (each iteration drains a fresh post produced
4080 // by the previous fire_deferred); 32 gives 10× headroom and
4081 // panics loudly on a real emit-loop. Per-iteration
4082 // `drain_and_flush` is independently capped by
4083 // `max_batch_drain_iterations` (configurable via
4084 // `Core::set_max_batch_drain_iterations`). A user sink that
4085 // emit-loops forever was a stack overflow pre-S2b — same
4086 // hazard, now iteration-shaped (no stack growth, bounded by
4087 // the cap, fail-loud).
4088 let mut redrain_passes: u32 = 0;
4089 loop {
4090 redrain_passes += 1;
4091 assert!(
4092 redrain_passes <= D260_MAX_REDRAIN_PASSES,
4093 "D260: wave-end drain-to-quiescence loop did not converge in \
4094 {D260_MAX_REDRAIN_PASSES} passes (mailbox_runnable={mb}, \
4095 deferred_runnable={df}). A producer-build sink is likely \
4096 in an emit-loop: each `fire_deferred` pass posts a fresh \
4097 `MailboxOp` that retriggers another `fire_deferred` pass. \
4098 Pre-S2b this would have been a stack overflow; D260 \
4099 iteration-shapes the hazard with this cap. Investigate \
4100 the producer-build sink that consumes its own output.",
4101 mb = self.core.mailbox.is_runnable(),
4102 df = self.core.deferred.is_runnable(),
4103 );
4104 if let Err(payload) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4105 self.core.drain_and_flush();
4106 })) {
4107 self.discard_wave_cleanup();
4108 self.core.clear_in_tick();
4109 std::panic::resume_unwind(payload);
4110 }
4111 // Wave cleanup + extract deferred jobs under the lock.
4112 let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
4113 let mut s = self.core.lock_state();
4114 // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
4115 // borrowed alongside state for wave-end cleanup. Per-thread;
4116 // independent of state. Sub-slice 3 moved deferred_* drains
4117 // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
4118 // currently_firing back to CoreState — clear via
4119 // CoreState::clear_wave_state under the held state lock.
4120 let result = with_wave_state(|ws| {
4121 s.clear_wave_state(ws);
4122 // Step 2a (D220-EXEC): defensive `currently_firing`
4123 // clear, relocated here from `CoreState::clear_wave_state`
4124 // (the field moved to the separate `CoreShared` region;
4125 // `St`'s `.shared` reaches it, a `&mut CoreState` can't).
4126 // Same wave-end point; `FiringGuard` RAII already
4127 // balances push/pop — this is the belt-and-suspenders
4128 // net for a future guard-bypassing path.
4129 s.shared.currently_firing.clear();
4130 ws.clear_wave_state();
4131 // /qa A1 (2026-05-09) discipline preserved: drain snapshot
4132 // retains under lock, release lock-released below to avoid
4133 // binding re-entrance under held mutex / borrow.
4134 let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
4135 // D291: success-path drain for terminal-slot snapshot
4136 // sets. The snapshot sets hold NO retains (slot owns
4137 // the ERROR-handle retain pre-snapshot AND post-commit
4138 // — the slot keeps it across the wave on commit); we
4139 // just clear the bookkeeping sets so the next wave
4140 // starts clean. Mirrors the placement of
4141 // `drain_wave_cache_snapshots`.
4142 Core::drain_wave_terminal_snapshots(ws);
4143 // `drain_deferred` takes `deferred_flush_jobs` +
4144 // `deferred_handle_releases` (incl. rotation releases pushed
4145 // by `clear_wave_state` above) + Slice E2
4146 // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
4147 // `pending_wipes` — all from WaveState post-Sub-slice-3.
4148 let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
4149 (jobs, releases, hooks, wipes, snapshot_releases)
4150 });
4151 // Release wave ownership now — AFTER drain + WaveState
4152 // cleanup, BEFORE `fire_deferred` below. Load-bearing: a sink
4153 // re-entering Core from a flush callback must observe
4154 // `in_tick` clear so its emit runs as a fresh owning wave.
4155 // (Mirrors the placement of the pre-D047 `s.in_tick = false`;
4156 // the drain-phase-panic window that placement had is closed
4157 // by the `catch_unwind` above.)
4158 self.core.clear_in_tick();
4159 result
4160 };
4161 // Lock dropped — fire deferred sinks + release retains + fire
4162 // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
4163 // + fire eager wipes (D069).
4164 //
4165 // D260 /qa P1 (2026-05-20): wrap `fire_deferred` in `catch_unwind`
4166 // so a sink panic inside this iteration (`fire_deferred`'s own
4167 // `last_panic` + `resume_unwind`-at-end discipline propagates a
4168 // panic out) doesn't bypass the per-iteration post-`fire_deferred`
4169 // cleanup (snapshot_releases at line ~3910, plus the outer
4170 // wave-end `tier3_clear` after the loop). Mirrors the L3844
4171 // drain-phase wrap. On panic: release the queued
4172 // `snapshot_releases` defensively lock-released, run the outer
4173 // wave-end finalization (`tier3_clear`), then `resume_unwind` so
4174 // the caller observes the panic. (D274 deleted the matching
4175 // `drain_deferred_producer_ops()` call that pre-D274 ran here as
4176 // a documented but no-op shim.)
4177 let fire_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4178 self.core
4179 .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
4180 }));
4181 // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
4182 // lock-released. Pre-A1 these were released inside the held
4183 // state + cross_partition locks; binding finalizers re-entering
4184 // Core would deadlock against either mutex. Drained earlier
4185 // under the lock; released here after both mutexes dropped and
4186 // sinks have fired. Runs even on `fire_deferred` panic — these
4187 // retains were lifted out of the wave state already and need
4188 // releasing regardless.
4189 for h in snapshot_releases {
4190 self.core.binding.release_handle(h);
4191 }
4192 if let Err(payload) = fire_result {
4193 // /qa P1: run wave-end finalization defensively before
4194 // propagating the panic. tier3_clear avoids stale-entry
4195 // leakage across cargo's thread-reuse (mirrors the
4196 // wave-start defensive clear in `begin_batch_with_guards`).
4197 // (D274 deleted `drain_deferred_producer_ops()` here — it
4198 // was a no-op shim per D211; no producer-deferred queue
4199 // remains to drain.)
4200 tier3_clear();
4201 std::panic::resume_unwind(payload);
4202 }
4203 // D260: check if `fire_deferred` posted new work to mailbox/
4204 // deferred. Quiescent ⇒ done. Else: re-claim `in_tick` (cleared
4205 // above, before `fire_deferred`) and loop the full sequence.
4206 if !self.core.mailbox.is_runnable() && !self.core.deferred.is_runnable() {
4207 break;
4208 }
4209 // Re-claim wave ownership for the secondary drain pass. The
4210 // previous iteration's `clear_in_tick` ran before its
4211 // `fire_deferred`, so the slot is `0` here. `claim_in_tick`
4212 // panics fail-loud on cross-Core mismatch (D252); a
4213 // same-thread same-Core re-claim must succeed-outermost.
4214 let owns = self.core.claim_in_tick();
4215 debug_assert!(
4216 owns,
4217 "D260: secondary drain pass should always succeed-outermost \
4218 (in_tick was cleared by the previous iteration's pre-fire_deferred clear)"
4219 );
4220 // /qa P2 (release-build safety): if `claim_in_tick` silently
4221 // returned `false` (slot already held — should be impossible
4222 // by-construction under D255 single-owner / D248 actor model,
4223 // but defense-in-depth), break the loop cleanly rather than
4224 // running `drain_and_flush` without ownership. A subsequent
4225 // outer wave entry will catch the still-runnable mailbox.
4226 if !owns {
4227 break;
4228 }
4229 }
4230 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
4231 // tracker at outermost wave-end (success path). Mirrors the
4232 // panic-discard branch above. Thread-local outlives BatchGuard
4233 // by default; cargo's thread-reuse across tests would propagate
4234 // stale entries. Cleared after sinks fire (sink callbacks may
4235 // re-enter Core via emit and could read the tier3 set
4236 // mid-wave; the wave is over here so clearing is safe).
4237 tier3_clear();
4238 // D246/S2c: no per-partition wave-owner guards to release
4239 // (single-owner ⇒ the §7 wave-locks are deleted).
4240 // Phase H+ STRICT (D115): drain deferred producer ops at
4241 // wave-end (now a no-op shim — §7 deleted the deferred-producer
4242 // queue; retained so call sites compile unchanged, D211).
4243 }
4244}