graphrefly_core/batch.rs
1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//! runs `op` lock-released, then drains all transitive fn-fires and
13//! flushes per-subscriber notifications. Each fn-fire iteration drops
14//! the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//! can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//! the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//! queueing + child propagation. `&self`-only; bracket-fires
20//! `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//! pause-buffer routing. Snapshots the subscriber list at first-touch-
23//! per-wave so late subscribers (installed mid-wave between drain
24//! iterations) don't receive duplicate deliveries from messages already
25//! queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//! the consumer for fn-fire if its tracked-deps set is satisfied.
28//! Called from `commit_emission`, plus `activate_derived` and
29//! `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//! discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//! user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//! `invalidate` / `complete` / `error` / `teardown` and run a nested
38//! wave (the existing `in_tick` re-entrance gate composes
39//! transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//! check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//! It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//! `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//! tier-split. Re-entrance from a handshake sink callback panics with
46//! the [`reentrance_guard`] diagnostic.
47
48use std::cell::RefCell;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = thread-local.** GraphReFly's wave-engine guarantees
66// that every emit at a given node within a single wave runs on the
67// same thread (the thread that holds the partition's `wave_owner`
68// `parking_lot::ReentrantMutex` — cross-thread emits at a node BLOCK
69// on that mutex and so always land in the OTHER thread's wave). A
70// wave is bounded above by the outermost `BatchGuard` drop on its
71// originating thread. Together this means a per-thread
72// `AHashSet<NodeId>` is the natural placement for "has node X already
73// emitted a tier-3 message in this wave?" — the set's lifetime
74// exactly matches the wave's, with no cross-thread or cross-wave
75// contamination.
76//
77// **History (D1 patch, 2026-05-09):** previously placed on
78// `crate::subgraph::SubgraphLockBox::state` per-partition (Q3 v1).
79// That placement was robust to per-partition wave parallelism but
80// vulnerable to mid-wave cross-thread `set_deps` partition splits:
81// thread A is mid-wave on partition P (wave_owner held) but between
82// fn fires (`currently_firing` empty); thread B's `set_deps` acquires
83// the state lock, P13's `currently_firing.is_empty()` check
84// short-circuits, the split proceeds, and X migrates from P to a
85// fresh orphan-side partition with an empty
86// `tier3_emitted_this_wave`. Thread A's subsequent emit at X then
87// mis-detects "first emit" and queues a Resolved alongside the prior
88// Data — R1.3.3.a violation. Thread-local placement is immune to this
89// hazard: thread B's split doesn't touch thread A's thread-local at
90// all.
91//
92// **Lifecycle:** populated by `Core::commit_emission` /
93// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
94// `BatchGuard` drop on this thread (both success and panic-discard
95// paths). Re-entrant nested waves on the same thread share the set —
96// inner-wave emits add to the same set; the outermost drop is the
97// canonical clear point. Cross-thread emits NEVER touch this thread's
98// set (they serialize on the partition wave_owner; the cross-thread
99// emit happens in the OTHER thread's emit-loop and uses the OTHER
100// thread's tier3 thread-local).
101thread_local! {
102 static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
103}
104
105// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
106//
107// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
108// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
109// identical to thread_local borrow_mut. The "mutex hop is slow" intuition
110// is wrong UNCONTENDED.
111// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
112// than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
113// cache-line bouncing on the lock state itself.
114// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
115// dominated by cache-line bouncing across cores, NOT by single-thread
116// mutex acquire overhead. Moving the four wave-scoped fields to a
117// per-thread thread_local eliminates the bounce point entirely.
118//
119// **Wave scope = thread, same as `TIER3_EMITTED_THIS_WAVE`:** every emit
120// in a wave runs on the thread that holds the partition wave_owner;
121// cross-thread emits BLOCK on wave_owner and so always land in the OTHER
122// thread's wave context with the OTHER thread's WAVE_STATE. Mid-wave
123// cross-thread `set_deps` partition splits don't touch this thread's
124// thread-local at all (D1 lesson, applied here).
125//
126// **Lifecycle:** populated by `Core::commit_emission` /
127// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
128// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
129// releases any retained handles still in `wave_cache_snapshots` /
130// `deferred_handle_releases`. Defensive wave-start clear at outermost
131// owning BatchGuard entry guards against cargo's thread-reuse propagating
132// stale entries from a prior panicked-mid-wave test.
133thread_local! {
134 static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
135}
136
137// Wave-ownership flag, keyed per-(Core, thread). Membership of a
138// `Core::generation` value means "this thread is currently inside an
139// OWNING wave on that Core" — i.e. the outermost `BatchGuard` whose drop
140// must run the drain. Replaces the former Core-global `CoreState::in_tick`
141// bool.
142//
143// **Why per-(Core, thread).** The flag must jointly satisfy three
144// constraints that no single-scope placement can:
145// - **Cross-Core isolation (/qa F1).** A thread holding a live
146// `BatchGuard` on Core-A and then entering a wave on Core-B must not
147// see Core-A's flag. Keying by `Core::generation` (a process-monotonic
148// id, never reused) gives each Core a distinct slot. A purely
149// thread-local bool failed here (Core-A's `in_tick` leaked to Core-B
150// on the same OS thread → Core-B non-owning → its writes drained by
151// Core-A's binding).
152// - **Disjoint-partition drain correctness.** Two threads running waves
153// on disjoint partitions of ONE Core run truly parallel (separate
154// `wave_owner`s — they do not block each other). A Core-global flag
155// made thread B observe thread A's `in_tick=true`, wrongly classify
156// its independent wave as nested, and no-op its drop — so B's wave
157// never drained (leaked payload retains, undelivered sink batches;
158// caught late by `wave_state_clear_outermost`). A per-thread slot
159// makes each thread own and drain its own disjoint wave.
160// - **Nested same-(Core, thread) re-entry (/qa EC#3).** A nested
161// `run_wave` / `actions.up` on the same Core and thread MUST observe
162// ownership so its drop no-ops and the outer wave drains. A shared
163// slot within one (Core, thread) preserves this.
164//
165// **No lock required.** `in_tick` is only ever read or written by the
166// wave-owner thread: cross-thread same-partition emits BLOCK on the
167// partition `wave_owner`, and cross-partition cascades acquire every
168// touched partition upfront. So — unlike `currently_firing`, which the
169// cross-thread P13 set_deps check (/qa F2) requires be Core-global and
170// cross-thread-visible — `in_tick` has no cross-thread read requirement
171// and needs no shared-state lock. (`currently_firing` deliberately stays
172// on `CoreState`; see `node.rs`.)
173//
174// Stale slots: the owning `BatchGuard::drop` releases the generation on
175// every exit path — normal return, the closure-body-panic branch, AND
176// the drain-phase-panic `catch_unwind` arm (before `resume_unwind`). So
177// a slot can only be left interned if `Drop` itself never runs:
178// `std::mem::forget(guard)` or a process abort without unwinding — both
179// out of contract (`BatchGuard` is `#[must_use]` + `!Send`). Such a
180// leaked key is inert: generations are never reused, so it can never
181// false-match a future Core (no correctness impact), and the leak is
182// bounded by the number of distinct Cores a thread ever forgets a guard
183// on — not an unbounded leak under any normal or panic-recovery path.
184//
185// History: this flag lived briefly per-thread (Q-beyond sub-slice 3),
186// was reverted to Core-global (/qa F1+F2), and is now keyed per-(Core,
187// thread) — the placement that satisfies all three constraints at once.
188// See `docs/rust-port-decisions.md` and the `docs/porting-deferred.md`
189// Phase-J / "in_tick Core-global→per-(Core,thread)" entry (2026-05-15).
190thread_local! {
191 static IN_TICK_OWNED: RefCell<AHashSet<u64>> = RefCell::new(AHashSet::new());
192}
193
194/// Wave-scoped state previously held under [`Core::cross_partition`]'s
195/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
196/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
197/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
198/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
199/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
200///
201/// All fields are populated and drained within one wave on one thread.
202/// Cross-thread access is structurally impossible — cross-thread emits
203/// block on partition `wave_owner` and land in the OTHER thread's wave
204/// context.
205///
206/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
207/// `deferred_handle_releases`, and `pending_notify` hold binding-side
208/// handle retains. They MUST be drained (and released through
209/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
210/// on success and panic paths. `pending_notify` holds one retain per
211/// payload-bearing message (one per `Message::payload_handle()`); the
212/// retains are taken in `Core::queue_notify` and balanced either by
213/// `flush_notifications` (success path: pushed into
214/// `deferred_handle_releases`) or directly in the panic-discard path of
215/// `BatchGuard::drop` (taken from `pending_notify` and released).
216///
217/// The thread_local has no `Drop` hook with access to a binding — a
218/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
219/// would leak retains until the thread exits OR the next outermost
220/// wave-start clear runs (which for safety we don't fire — clearing
221/// without releasing would double-leak by losing the retain). The
222/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
223/// clears `pending_auto_resolve` + `pending_pause_overflow` +
224/// `pending_fires` (no retains) + `currently_firing` +
225/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
226/// retain-holding fields — those must be empty by construction at
227/// outermost wave start (a prior wave's panic-discard path drained them,
228/// or a prior wave's success path drained them).
229pub(crate) struct WaveState {
230 /// Payload-handle releases owed for messages that landed in
231 /// `pending_notify` during this wave (one per `payload_handle()`).
232 /// `BatchGuard::drop` releases these after sinks fire and the lock
233 /// is dropped, balancing the retain done in `queue_notify`.
234 pub(crate) deferred_handle_releases: Vec<HandleId>,
235 /// Pre-wave cache snapshots used to restore state if the wave aborts
236 /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
237 /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
238 /// the wave started writing to it. The snapshotted handle holds a
239 /// retain (taken when the snapshot was inserted) so it stays alive
240 /// for restoration. On wave success, snapshots are drained and their
241 /// retains released. On wave abort, each cache slot is restored from
242 /// the snapshot and the original retain transfers to the cache slot.
243 pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
244 /// Nodes that need an auto-Resolved at wave end if they don't receive
245 /// a tier-3+ message from their own commit_emission. Populated by
246 /// the RESOLVED child propagation in `commit_emission`. Drained by
247 /// the auto-resolve sweep in `drain_and_flush`.
248 pub(crate) pending_auto_resolve: AHashSet<NodeId>,
249 /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
250 /// [`Core::queue_notify`] when the pause buffer first overflows;
251 /// drained at wave-end after the lock-released call to
252 /// `BindingBoundary::synthesize_pause_overflow_error`.
253 pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
254 /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
255 ///
256 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
257 /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
258 /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
259 /// child-cascade `QueueFire` branch, `activate_derived`'s producer
260 /// queueing, `resume`'s pending-wave consolidation, and operator
261 /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
262 /// `fire_regular` / `fire_operator` (each removes the firing node
263 /// before invoking).
264 pub(crate) pending_fires: AHashSet<NodeId>,
265 /// Per-node outgoing message buffer; flushed at wave end. Insertion-
266 /// ordered so flush order is deterministic — load-bearing for
267 /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
268 /// companion both have queued messages in the same wave, the meta
269 /// (queued first via `teardown_inner`'s recursion order) flushes
270 /// first.
271 ///
272 /// Each entry carries the per-wave subscriber snapshot taken at first
273 /// touch (Slice A close, M1: lock-released drain). Late subscribers
274 /// installed mid-wave between fn-fire iterations don't appear in
275 /// already-snapshotted entries; this is the load-bearing fix that
276 /// prevents duplicate-Data delivery when a handshake delivers the
277 /// post-commit cache and the wave's flush would otherwise also fire
278 /// to the same sink.
279 ///
280 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
281 /// `CoreState::pending_notify` to per-thread `WaveState`. The map
282 /// holds a payload-handle retain per payload-bearing message
283 /// (`Message::payload_handle()`); these MUST be released by the
284 /// outermost `BatchGuard::drop` (success path: through
285 /// `flush_notifications` → `deferred_handle_releases`; panic path:
286 /// directly in `BatchGuard::drop`'s panic branch).
287 pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
288 // Q-beyond Sub-slice 3 (D108, 2026-05-09) moved `in_tick` and
289 // `currently_firing` from `CoreState` to per-thread `WaveState`;
290 // /qa F1+F2 (2026-05-10) reverted both to `CoreState`; the in_tick
291 // placement was finalized 2026-05-15 (see below). The two fields have
292 // *different* scope requirements:
293 //
294 // - **`in_tick` — per-(Core, thread).** Pure thread-local broke
295 // cross-Core isolation (Core-A's flag leaked to Core-B on the same
296 // OS thread → Core-B non-owning → its writes drained by Core-A's
297 // binding, /qa F1). Pure Core-global broke disjoint-partition drain
298 // ownership (thread B's independent disjoint wave saw thread A's
299 // flag → non-owning → never drained). The (Core, thread) key —
300 // `crate::batch::IN_TICK_OWNED`, keyed by `Core::generation` —
301 // satisfies both, plus same-(Core, thread) nested re-entry
302 // (/qa EC#3). NOT a `CoreState` field.
303 //
304 // - **`currently_firing` — Core-global (stays on `CoreState`).**
305 // Per-thread placement silently bypassed the cross-thread P13
306 // partition-migration check in `Core::set_deps`: thread B's set_deps
307 // must observe thread A's firing pushes. Per-Core (cross-thread
308 // visible) placement restores the D091 safety check (/qa F2).
309 //
310 // The other 11 wave-scoped fields stay per-thread because they're
311 // accessed only by the wave-owner thread under `wave_owner`
312 // discipline (cross-thread emits BLOCK on partition wave_owner).
313 /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
314 /// for `OnInvalidate` cleanup hook firing. A node already in this
315 /// set this wave has already had its `OnInvalidate` queued into
316 /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
317 /// `invalidate_inner` re-encounters it.
318 ///
319 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
320 /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
321 /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
322 /// cleared by `WaveState::clear_wave_state`.
323 pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
324 /// Deferred sink-fire jobs collected by `flush_notifications`.
325 /// `flush_notifications` populates this from `pending_notify`;
326 /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
327 /// each entry lock-released. Each tuple is
328 /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
329 /// waves.
330 ///
331 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
332 /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
333 /// retains held — the `Vec<Sink>` clones own Arcs that drop
334 /// naturally; the `Vec<Message>` payload retains were already moved
335 /// into `deferred_handle_releases` by `flush_notifications`.
336 pub(crate) deferred_flush_jobs: DeferredJobs,
337 /// Slice E2 (per D060/D061): lock-released drain queue for
338 /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
339 /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
340 /// drained after the lock drops at wave boundary by
341 /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
342 /// D060). Panic-discarded silently per D061.
343 ///
344 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
345 /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
346 pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
347 /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
348 /// `BindingBoundary::wipe_ctx` calls fired eagerly from
349 /// `Core::terminate_node` when a resubscribable node terminates with
350 /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
351 /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
352 /// silently.
353 ///
354 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
355 /// `CoreState::pending_wipes` to per-thread `WaveState`.
356 pub(crate) pending_wipes: Vec<NodeId>,
357}
358
359impl WaveState {
360 fn new() -> Self {
361 Self {
362 deferred_handle_releases: Vec::new(),
363 wave_cache_snapshots: HashMap::new(),
364 pending_auto_resolve: AHashSet::new(),
365 pending_pause_overflow: Vec::new(),
366 pending_fires: AHashSet::new(),
367 pending_notify: IndexMap::new(),
368 invalidate_hooks_fired_this_wave: AHashSet::new(),
369 deferred_flush_jobs: Vec::new(),
370 deferred_cleanup_hooks: Vec::new(),
371 pending_wipes: Vec::new(),
372 }
373 }
374
375 /// Wave-end clear of the non-retain-holding fields. Called from
376 /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
377 /// (`wave_cache_snapshots`, `deferred_handle_releases`,
378 /// `pending_notify`) are NOT cleared here — they follow the
379 /// success/panic paths' explicit drain discipline in
380 /// `BatchGuard::drop`.
381 pub(crate) fn clear_wave_state(&mut self) {
382 self.pending_auto_resolve.clear();
383 // pending_pause_overflow is normally drained by drain_and_flush
384 // via the synthesis loop. If a wave is panic-discarded BEFORE
385 // synthesis runs, BatchGuard::drop's panic path also clears it
386 // explicitly. Pre-wave defensive clear in
387 // `begin_batch_with_guards` makes this idempotent.
388 self.pending_pause_overflow.clear();
389 // Sub-slice 2: pending_fires is intentionally NOT cleared
390 // here. Two reasons:
391 // 1. Wave-success drain empties it by construction: every
392 // `pick_next_fire` selection is removed by
393 // `fire_regular` / `fire_operator` before invocation,
394 // and `drain_and_flush` only exits when the set is empty.
395 // 2. The `Core::resume` default-mode consolidated-fire
396 // pattern stages an entry OUTSIDE any in-tick wave and
397 // then enters a new wave to drain it; clearing here
398 // would erase that pre-staged entry. The panic-discard
399 // path in `BatchGuard::drop` clears it explicitly.
400
401 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
402 // CoreState::currently_firing — defensive clear there.
403 // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
404 // wave-scoped — cleared so the next wave can fire cleanups
405 // again.
406 self.invalidate_hooks_fired_this_wave.clear();
407 // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
408 // `pending_wipes` are intentionally NOT cleared here. They
409 // follow the same discipline as `deferred_handle_releases` /
410 // `pending_notify`:
411 // - SUCCESS path (`BatchGuard::drop` non-panic): drained by
412 // `Core::drain_deferred` AFTER `clear_wave_state` runs,
413 // then fired lock-released by `Core::fire_deferred`.
414 // - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
415 // `std::mem::take`-and-dropped AFTER `clear_wave_state`
416 // runs (silently per D061 / D069).
417 // Clearing here would race the success path: queued sink fires
418 // / cleanup hooks / wipes would be erased BEFORE
419 // `drain_deferred` could take them.
420 }
421}
422
423/// Run a closure with mutable access to this thread's [`WaveState`].
424///
425/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
426/// for sites that touch ONE field. For sites that interleave state lock
427/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
428/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
429/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
430/// pattern).
431///
432/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
433/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
434/// on nested borrow. The same discipline that the prior
435/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
436/// holding cross_partition) carries over.
437pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
438 WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
439}
440
441/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
442/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
443/// outermost owning entry. Mirrors the pre-existing tier3 defensive
444/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
445/// propagating stale entries from a prior panicked-mid-wave test.
446///
447/// The retain-holding fields (`wave_cache_snapshots` /
448/// `deferred_handle_releases`) MUST already be empty by construction at
449/// outermost wave entry — outermost `BatchGuard::drop` always drains
450/// them on both success and panic paths. If they're non-empty here it
451/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
452/// the next BatchGuard's outermost drop will eventually drain them.
453fn wave_state_clear_outermost() {
454 with_wave_state(|ws| {
455 // /qa F4 (2026-05-10): debug_assert that retain-holding fields
456 // are empty at outermost wave start. The invariant claim is
457 // "outermost BatchGuard::drop drains them on both success and
458 // panic paths, so they're empty before the next wave starts."
459 // If a panic path EVER bypasses the drain (today: not reachable
460 // because BatchGuard::drop is robust against panicking sinks via
461 // catch_unwind), this assert catches it in tests immediately
462 // rather than letting stale entries leak into the next wave's
463 // drain (which would release Core-A's HandleIds via Core-B's
464 // binding under cross-Core same-thread sequential use).
465 debug_assert!(
466 ws.wave_cache_snapshots.is_empty(),
467 "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
468 outermost wave start ({} entries) — prior BatchGuard::drop \
469 bypassed the drain (would leak retains into next wave's \
470 binding). See /qa F4 (2026-05-10).",
471 ws.wave_cache_snapshots.len()
472 );
473 debug_assert!(
474 ws.deferred_handle_releases.is_empty(),
475 "wave_state_clear_outermost: deferred_handle_releases non-empty \
476 at outermost wave start ({} entries) — prior BatchGuard::drop \
477 bypassed the drain. See /qa F4 (2026-05-10).",
478 ws.deferred_handle_releases.len()
479 );
480 debug_assert!(
481 ws.pending_notify.is_empty(),
482 "wave_state_clear_outermost: pending_notify non-empty at \
483 outermost wave start ({} entries) — prior BatchGuard::drop \
484 bypassed the drain. See /qa F4 (2026-05-10).",
485 ws.pending_notify.len()
486 );
487 ws.pending_auto_resolve.clear();
488 ws.pending_pause_overflow.clear();
489 // Sub-slice 2: pending_fires is intentionally NOT cleared here.
490 // Pre-Sub-slice-2 it lived on CoreState and survived between
491 // waves; load-bearing for `Core::resume`'s default-mode
492 // consolidated-fire pattern, which inserts into pending_fires
493 // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
494 // false at that moment) and then calls `run_wave_for(node_id)`
495 // — `run_wave_for` enters a NEW outermost wave whose drain must
496 // pick up that pre-staged pending_fires entry. Clearing here
497 // would erase it.
498 //
499 // pending_fires holds no retains, so a stale entry from a
500 // prior panicked-mid-wave test that bypassed BatchGuard::drop
501 // would leak as a spurious fire on the next wave on the same
502 // thread (no refcount damage). The panic-discard path in
503 // BatchGuard::drop and the wave-success drain together
504 // guarantee pending_fires is empty by wave end; relying on
505 // that invariant matches the pre-refactor lifecycle.
506 //
507 // Intentionally NOT clearing wave_cache_snapshots /
508 // deferred_handle_releases / pending_notify here — those hold
509 // retains and need a binding to release. Documented invariant:
510 // they're empty by outermost wave start.
511
512 // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
513 // defensively clear the OnInvalidate dedup set on outermost-wave
514 // entry. Holds no retains; a stale entry from a prior
515 // panicked-mid-wave test that bypassed BatchGuard::drop would
516 // only suppress the OnInvalidate cleanup hook for that node on
517 // the next wave (no refcount damage). Clearing matches the
518 // tier3 defensive-clear precedent.
519 //
520 // `currently_firing` was reverted to CoreState (per /qa F2 — the
521 // per-thread placement silently bypassed the cross-thread P13
522 // partition-migration check); its defensive clear lives in
523 // `CoreState::clear_wave_state` (which BatchGuard::drop runs
524 // wave-end on both success and panic paths).
525 ws.invalidate_hooks_fired_this_wave.clear();
526 // Intentionally NOT clearing deferred_flush_jobs /
527 // deferred_cleanup_hooks / pending_wipes here — by invariant
528 // they're empty at outermost wave start (drained on success
529 // by drain_deferred → fire_deferred; drained on panic by
530 // BatchGuard::drop's panic branch). Pre-clearing would race a
531 // hypothetical wave that staged into them OUTSIDE in_tick
532 // (none does today, but matching the deferred_handle_releases
533 // / pending_notify discipline keeps the invariant uniform).
534 });
535}
536
537// Profile-driven optimization (2026-05-10): per-thread partition cache for
538// `begin_batch_for`. The common hot-loop pattern is repeated emits to the
539// same seed node (e.g., state node in a tight emit loop). Each emit calls
540// `begin_batch_for(seed)` which calls `compute_touched_partitions(seed)` —
541// a BFS that acquires state + registry locks and allocates a HashSet +
542// SmallVec. Since the topology doesn't change between emits (registry epoch
543// is stable), we cache the BFS result per-thread and skip the BFS on hit.
544//
545// Cache validity: keyed on (core_generation, seed, epoch). Any registry mutation
546// (register/union/split) bumps epoch → invalidates. The post-acquire epoch
547// recheck in `begin_batch_for` catches the (rare) case where a concurrent
548// mutation happens between cache read and lock acquisition.
549struct PartitionCache {
550 /// Monotonic generation from [`crate::node::CORE_GENERATION`]. Avoids
551 /// ABA false-hits that `Arc::as_ptr` would suffer after Core drop +
552 /// allocator address reuse (/qa F1, 2026-05-10).
553 core_generation: u64,
554 seed: NodeId,
555 epoch: u64,
556 partitions: SmallVec<[crate::subgraph::SubgraphId; 4]>,
557}
558
559thread_local! {
560 static PARTITION_CACHE: RefCell<Option<PartitionCache>> = const { RefCell::new(None) };
561}
562
563/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
564/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
565/// wave-scope rationale.
566fn tier3_check(node: NodeId) -> bool {
567 TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
568}
569
570/// Mark `node` as having emitted a tier-3 message in the current wave on
571/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
572fn tier3_mark(node: NodeId) {
573 TIER3_EMITTED_THIS_WAVE.with(|s| {
574 s.borrow_mut().insert(node);
575 });
576}
577
578/// Wave-end clear of the per-thread tier3 tracker. Called from the
579/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
580/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
581/// invoke this — the outer wave is still in flight and inner-wave marks
582/// are part of the outer wave's Slice G coalescing state.
583fn tier3_clear() {
584 TIER3_EMITTED_THIS_WAVE.with(|s| {
585 s.borrow_mut().clear();
586 });
587}
588
589/// Deferred sink-fire jobs collected during `flush_notifications`. Each
590/// entry pairs a snapshot of the sink Arcs to fire with the messages to
591/// deliver to them — one entry per (node × phase) cell with non-empty
592/// content. Drained from `CoreState` and fired lock-released.
593pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
594
595/// Lock-released drain payload of the wave's BatchGuard:
596/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
597/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
598/// Sliced into a type alias to satisfy `clippy::type_complexity`.
599pub(crate) type WaveDeferred = (
600 DeferredJobs,
601 Vec<HandleId>,
602 Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
603 Vec<crate::handle::NodeId>,
604);
605
606/// One subscriber-snapshot epoch within a node's wave-end notification
607/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
608/// for the node in a wave, and a fresh batch is opened whenever the node's
609/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
610/// existing sink unsubscribes, or a handshake-time panic evicts an
611/// orphaned sink). All messages within one batch flush to the same sink
612/// list — the snapshot taken when the batch opened, frozen against
613/// subsequent revision bumps.
614pub(crate) struct PendingBatch {
615 /// `NodeRecord::subscribers_revision` value at the moment this batch
616 /// opened. Used by `queue_notify` to decide append-to-last-batch vs
617 /// open-fresh-batch on every push.
618 pub(crate) snapshot_revision: u64,
619 /// Subscriber snapshot frozen at batch-open time. SmallVec<[_; 1]>
620 /// inlines the common single-subscriber case (avoids heap alloc for
621 /// the dominant 1-sink-per-node pattern in most reactive graphs).
622 pub(crate) sinks: SmallVec<[Sink; 1]>,
623 /// Messages queued to this batch. SmallVec<[_; 3]> inlines the
624 /// common per-node-per-wave message set (DIRTY + DATA + optional
625 /// RESOLVED) without heap allocation.
626 pub(crate) messages: SmallVec<[Message; 3]>,
627}
628
629/// Per-node wave-end notification queue, structured as one or more
630/// subscriber-snapshot epochs (`batches`). The common case (no
631/// mid-wave subscribe / unsubscribe at this node) keeps a single
632/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
633///
634/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
635/// `(sinks, messages)` pair per node — the snapshot froze on first
636/// `queue_notify` and was reused for every subsequent emit to the same
637/// node in the wave. That caused the documented late-subscriber +
638/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
639/// between two emits to the same node was invisible to the second
640/// emit's flush slice. The revision-tracked batch list resolves it —
641/// late subs land in a fresh batch that frozenly carries them, while
642/// pre-subscribe batches retain their original snapshot so the new
643/// sub doesn't double-receive earlier emits via flush AND handshake.
644pub(crate) struct PendingPerNode {
645 pub(crate) batches: SmallVec<[PendingBatch; 1]>,
646}
647
648impl PendingPerNode {
649 /// Iterate every queued message for this node across all batches in
650 /// arrival order. Used by R1.3.3.a invariant assertions and the
651 /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
652 /// reason about wave-content per node, not per batch.
653 pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
654 self.batches.iter().flat_map(|b| b.messages.iter())
655 }
656
657 /// Mutable counterpart for `iter_messages`. Used by
658 /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
659 /// entries to Data when a wave detects a multi-emit case after the
660 /// fact.
661 pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
662 self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
663 }
664}
665
666/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
667///
668/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
669/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
670/// `set_deps(N, ...)` from inside N's own fn-fire with
671/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
672/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
673/// a different dep ordering than Phase-3's `tracked` storage.
674///
675/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
676/// callbacks like `project_each` / `predicate_each`). Drop fires even
677/// on panic, so the stack stays balanced under user-fn unwinds.
678///
679/// Membership semantics (NOT strict LIFO): the only consumer of
680/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
681/// `contains(&n)` — a set-membership test. Drop pops the right-most
682/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
683/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
684/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
685/// physical order of the remaining As may not match construction order,
686/// but membership is preserved. If a future call site needs strict LIFO
687/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
688/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
689pub(crate) struct FiringGuard {
690 core: Core,
691 node_id: NodeId,
692}
693
694impl FiringGuard {
695 pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
696 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
697 // CoreState (cross-thread visible, restoring the D091 P13 check).
698 // Push under the state lock scope.
699 {
700 let mut s = core.lock_state();
701 s.currently_firing.push(node_id);
702 }
703 Self {
704 core: core.clone(),
705 node_id,
706 }
707 }
708}
709
710impl Drop for FiringGuard {
711 fn drop(&mut self) {
712 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
713 // CoreState. Pop under state lock.
714 {
715 let mut s = self.core.lock_state();
716 // Pop the right-most matching node_id (membership semantics —
717 // not strict LIFO). If absent, an external rebalance already
718 // popped — silent no-op (panic-in-Drop is poison).
719 if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
720 s.currently_firing.swap_remove(pos);
721 }
722 }
723 }
724}
725
726/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
727/// uninitialized or the contained type doesn't match `T` — both are
728/// invariant violations for any `fire_op_*` helper that should only be
729/// called from `fire_operator`'s match arm for the matching variant.
730fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
731 s.require_node(node_id)
732 .op_scratch
733 .as_ref()
734 .expect("op_scratch slot uninitialized for operator node")
735 .as_any_ref()
736 .downcast_ref::<T>()
737 .expect("op_scratch type mismatch")
738}
739
740/// Mutable borrow of the per-operator scratch slot. Same invariants as
741/// [`scratch_ref`].
742fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
743 s.require_node_mut(node_id)
744 .op_scratch
745 .as_mut()
746 .expect("op_scratch slot uninitialized for operator node")
747 .as_any_mut()
748 .downcast_mut::<T>()
749 .expect("op_scratch type mismatch")
750}
751
752impl Core {
753 // -------------------------------------------------------------------
754 // Wave entry + drain
755 // -------------------------------------------------------------------
756
757 /// Wave entry. The caller passes a closure that performs the wave's
758 /// triggering operation (`commit_emission`, `terminate_node`, etc.).
759 /// The closure runs lock-released; closure-internal Core methods
760 /// acquire the state lock as they go.
761 ///
762 /// **Implementation:** delegates to [`Self::begin_batch`] for the
763 /// wave's RAII lifecycle. The returned `BatchGuard` holds the
764 /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
765 /// emits block; same-thread re-entry passes through), claims `in_tick`,
766 /// and on drop runs the drain + flush + sink-fire phases — OR, if the
767 /// closure panicked, the panic-discard path that restores cache
768 /// snapshots and clears in_tick. This unification gives `run_wave` the
769 /// same panic-safety guarantee as the user-facing `Core::batch`.
770 ///
771 /// **Re-entrance:** a closure invoked from inside another wave — the
772 /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
773 /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
774 /// The outer wave's drain picks up the inner closure's queued work.
775 ///
776 /// **Lock-release discipline (Slice A close, M1):** all binding-side
777 /// callbacks except the subscribe-time handshake fire lock-released.
778 /// Sinks that re-enter Core run a nested wave; user fns that re-enter
779 /// Core run a nested wave; custom-equals oracles that re-enter Core
780 /// run a nested wave. Cross-thread emits block at `wave_owner` until
781 /// the in-flight wave's drain completes — preserving the user-facing
782 /// "emit returning means subscribers have observed" contract.
783 /// Wave entry with a known `seed` node. Acquires only the partitions
784 /// transitively touched from `seed` (downstream cascade via
785 /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
786 /// current partition. The canonical Y1 parallelism win for per-seed
787 /// entry points (`Core::emit`, `Core::subscribe`'s activation,
788 /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
789 /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
790 /// push-on-subscribe).
791 ///
792 /// Two threads with disjoint touched-partition sets run truly
793 /// parallel — they don't block each other on Core-global locks.
794 /// Same-thread re-entry passes through each partition's
795 /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
796 /// partition (or any overlapping touched-partition set) serialize
797 /// per the per-partition `wave_owner` mutex, preserving the
798 /// "emit returning means subscribers have observed" contract.
799 ///
800 /// Slice Y1 / Phase E (2026-05-08).
801 pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
802 where
803 F: FnOnce(&Self),
804 {
805 let _guard = self.begin_batch_for(seed);
806 op(self);
807 }
808
809 /// Fallible wave entry. Returns `Err` if partition acquire violates
810 /// ascending order (Phase H+ STRICT, D115). Used by `try_emit` /
811 /// `try_complete` / `try_error`; the public `run_wave_for` calls
812 /// `begin_batch_for` which panics on violation.
813 pub(crate) fn try_run_wave_for<F>(
814 &self,
815 seed: crate::handle::NodeId,
816 op: F,
817 ) -> Result<(), crate::node::PartitionOrderViolation>
818 where
819 F: FnOnce(&Self),
820 {
821 let _guard = self.try_begin_batch_for(seed)?;
822 op(self);
823 Ok(())
824 }
825
826 /// Drain retains held by `wave_cache_snapshots` and return them so
827 /// the caller can release them lock-released. Called from the
828 /// wave-success path in [`BatchGuard::drop`].
829 ///
830 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
831 /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
832 /// drain-and-release-lock-released discipline (introduced as /qa A1
833 /// fix 2026-05-09 against the prior cross_partition mutex) carries
834 /// over: caller drains under WaveState borrow + state lock, releases
835 /// after both are dropped — `release_handle` may re-enter Core via
836 /// finalizers and re-entry under either guard would deadlock /
837 /// double-borrow.
838 #[must_use]
839 pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
840 if ws.wave_cache_snapshots.is_empty() {
841 return Vec::new();
842 }
843 std::mem::take(&mut ws.wave_cache_snapshots)
844 .into_values()
845 .collect()
846 }
847
848 /// Restore cache slots from `wave_cache_snapshots` and clear the map.
849 /// Called from the wave-abort path in `BatchGuard::drop` (panic).
850 ///
851 /// For each snapshotted node:
852 ///
853 /// 1. Read the current cache (the in-flight new value).
854 /// 2. Set `cache = old_handle` (the snapshot's retained value).
855 /// 3. Release the now-unowned current cache handle.
856 ///
857 /// Returns the list of "current" handles to release outside the lock.
858 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
859 /// to per-thread `WaveState`; signature takes both `s` (for cache
860 /// slots) and `ws` (for the snapshots map).
861 pub(crate) fn restore_wave_cache_snapshots(
862 &self,
863 s: &mut CoreState,
864 ws: &mut WaveState,
865 ) -> Vec<HandleId> {
866 if ws.wave_cache_snapshots.is_empty() {
867 return Vec::new();
868 }
869 let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
870 let mut releases = Vec::with_capacity(snapshots.len());
871 for (node_id, old_handle) in snapshots {
872 let Some(rec) = s.nodes.get_mut(&node_id) else {
873 releases.push(old_handle);
874 continue;
875 };
876 let current = std::mem::replace(&mut rec.cache, old_handle);
877 if current != NO_HANDLE {
878 releases.push(current);
879 }
880 }
881 releases
882 }
883
884 /// Drain pending fires until quiescent, then flush wave-end notifications
885 /// to subscribers. Each fire iteration drops the state lock around the
886 /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
887 ///
888 /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
889 /// and [`super::node::Core::activate_derived`] (via `run_wave`).
890 pub(crate) fn drain_and_flush(&self) {
891 let mut guard = 0u32;
892 loop {
893 // R1.3.8.c (Slice F, A3): if no fires are pending but there are
894 // queued pause-overflow ERRORs, synthesize them now. The
895 // resulting ERROR cascade may add to pending_fires (children
896 // settling their terminal state), so we loop back to drain.
897 //
898 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
899 // and pending_pause_overflow both live on per-thread
900 // WaveState. State lock no longer required for either read.
901 let synth_pending = with_wave_state(|ws| {
902 if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
903 std::mem::take(&mut ws.pending_pause_overflow)
904 } else {
905 Vec::new()
906 }
907 });
908 for entry in synth_pending {
909 // Lock-released call to the binding hook. Default impl
910 // returns None — the binding has opted out of R1.3.8.c
911 // and we fall back to silent-drop + ResumeReport.dropped.
912 let handle = self.binding.synthesize_pause_overflow_error(
913 entry.node_id,
914 entry.dropped_count,
915 entry.configured_max,
916 entry.lock_held_ns / 1_000_000,
917 );
918 if let Some(h) = handle {
919 // Re-enter Core::error to terminate the node and
920 // cascade. We're inside a wave (`in_tick = true`),
921 // so error() gets a non-owning batch guard — it
922 // doesn't try to start its own drain. The cascade
923 // queues into our outer drain via pending_fires
924 // and pending_notify.
925 self.error(entry.node_id, h);
926 }
927 }
928
929 // Pick next fire under a short lock. Also re-read the configured
930 // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
931 // without restarting waves mid-flight.
932 //
933 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
934 // on per-thread WaveState; pick_next_fire takes both state and
935 // WaveState. The pending_size diagnostic and emptiness check
936 // also read WaveState. Borrow scopes are split: WaveState
937 // borrow drops before fire_fn runs (which re-borrows WaveState
938 // via fire_regular / fire_operator).
939 let (next, cap, pending_size) = {
940 let s = self.lock_state();
941 let cap = s.max_batch_drain_iterations;
942 let (next, pending_size) = with_wave_state(|ws| {
943 if ws.pending_fires.is_empty() {
944 return (None, 0);
945 }
946 let size = ws.pending_fires.len();
947 let next = Self::pick_next_fire(&s, ws);
948 (next, size)
949 });
950 if pending_size == 0 {
951 break;
952 }
953 (next, cap, pending_size)
954 };
955 guard += 1;
956 assert!(
957 guard < cap,
958 "wave drain exceeded {cap} iterations \
959 (pending_fires={pending_size}). Most likely cause: a runtime \
960 cycle introduced by an operator that re-arms its own pending_fires \
961 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
962 itself, or a fn that calls Core::emit on a node whose fn fires \
963 the original node again). Structural cycles via set_deps are \
964 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
965 only with concrete evidence the workload needs more iterations."
966 );
967 let Some(next) = next else { break };
968 // fire_fn manages its own locking around invoke_fn.
969 self.fire_fn(next);
970 }
971 // Auto-resolve sweep: nodes registered in pending_auto_resolve
972 // by the RESOLVED child propagation need a Resolved if they didn't
973 // fire and settle via their own commit_emission. Check pending_notify
974 // for each candidate — if it has Dirty but no tier-3+ message, the
975 // node never settled and needs auto-Resolved. Route through
976 // queue_notify so paused nodes get the Resolved into their pause
977 // buffer.
978 let mut s = self.lock_state();
979 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
980 // and pending_notify both live on per-thread WaveState. /qa A5
981 // fix (2026-05-09): explicit scope for the WaveState borrow so
982 // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
983 // re-borrows WaveState for `pending_pause_overflow.push` /
984 // `pending_notify` writes — re-entrance on RefCell::borrow_mut
985 // would panic. Explicit scope makes the lifetime load-bearing.
986 let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
987 for node_id in candidates {
988 let needs_resolve = with_wave_state(|ws| {
989 ws.pending_notify
990 .get(&node_id)
991 .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
992 });
993 if needs_resolve {
994 self.queue_notify(&mut s, node_id, Message::Resolved);
995 }
996 }
997 // Final flush phase — populates deferred_flush_jobs
998 // from pending_notify (already carries per-node sink snapshots).
999 self.flush_notifications(&mut s);
1000 }
1001
1002 /// Pick the pending node with the lowest topological rank.
1003 ///
1004 /// Nodes with lower `topo_rank` have no transitive upstream in
1005 /// `pending_fires` (by construction — `topo_rank = 1 + max dep rank`).
1006 /// This is O(|pending_fires|) instead of the prior O(N·V) BFS.
1007 /// §10 perf optimization (D047, Slice U).
1008 ///
1009 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` lives on
1010 /// per-thread `WaveState`. Caller passes `&WaveState` alongside
1011 /// `&CoreState` so the borrow scopes stay disjoint and visible.
1012 fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1013 ws.pending_fires
1014 .iter()
1015 .copied()
1016 .min_by_key(|&id| s.nodes.get(&id).map_or(0, |r| r.topo_rank))
1017 }
1018
1019 /// Wave drain entry point. Dispatches via `rec.op` to either the
1020 /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1021 /// dispatch ([`Self::fire_operator`]).
1022 pub(crate) fn fire_fn(&self, node_id: NodeId) {
1023 let op = {
1024 let s = self.lock_state();
1025 s.nodes.get(&node_id).and_then(|r| r.op)
1026 };
1027 match op {
1028 Some(operator_op) => self.fire_operator(node_id, operator_op),
1029 None => {
1030 // State / Derived / Dynamic / Producer all dispatch via fn_id.
1031 self.fire_regular(node_id);
1032 }
1033 }
1034 }
1035
1036 /// Fire a node's fn lock-released around `invoke_fn`.
1037 ///
1038 /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1039 /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1040 /// or stateless.
1041 ///
1042 /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1043 /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1044 /// wave — the in_tick gate composes naturally because nested calls
1045 /// observe `in_tick = true` and skip their own drain.
1046 ///
1047 /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1048 /// decide between Noop+RESOLVED, single Data, or Batch.
1049 ///
1050 /// Phase 4: commit emissions. Single Data goes through
1051 /// `commit_emission` (with equals substitution). Batch emissions are
1052 /// processed in sequence — Data via `commit_emission_verbatim` (no
1053 /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1054 /// terminal cascade.
1055 #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1056 fn fire_regular(&self, node_id: NodeId) {
1057 enum FireAction {
1058 None,
1059 SingleData(HandleId),
1060 Batch(SmallVec<[FnEmission; 2]>),
1061 }
1062
1063 // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1064 // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1065 // (Phase 1.5 below): the cleanup hook only fires when the fn has
1066 // run at least once already in this activation cycle.
1067 let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
1068 let s = self.lock_state();
1069 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1070 // on per-thread WaveState. Removed via with_wave_state — no
1071 // re-entry concern because only the immediate remove happens
1072 // under the borrow.
1073 with_wave_state(|ws| {
1074 ws.pending_fires.remove(&node_id);
1075 });
1076 let rec = s.require_node(node_id);
1077 // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1078 // mode opts out of the gate per D011), or stateless.
1079 if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
1080 None
1081 } else {
1082 rec.fn_id.map(|fn_id| {
1083 let use_mask = rec.dep_records.len() <= 64;
1084 let mask = rec.involved_mask;
1085 let dep_batches: Vec<DepBatch> = rec
1086 .dep_records
1087 .iter()
1088 .enumerate()
1089 .map(|(i, dr)| DepBatch {
1090 data: dr.data_batch.clone(),
1091 prev_data: dr.prev_data,
1092 // §10.3 perf (Slice V1): derive from bitmask
1093 // for ≤64 deps; fall back to per-dep field.
1094 involved: if use_mask {
1095 (mask >> i) & 1 != 0
1096 } else {
1097 dr.involved_this_wave
1098 },
1099 })
1100 .collect();
1101 (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
1102 })
1103 }
1104 };
1105 let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
1106 return;
1107 };
1108
1109 // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1110 // the fn has fired at least once in this activation cycle, fire its
1111 // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1112 // local resources. First-fire is intentionally skipped — there is
1113 // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1114 // cleanup re-entrance is not the A6 reentrancy concern (which
1115 // protects against `set_deps(self, ...)` from inside the in-flight
1116 // invoke_fn). Operator nodes never reach this path (`fire_regular`
1117 // is the fn-id branch of `fire_fn`; operators dispatch via
1118 // `fire_operator`), so cleanup hooks correctly only fire for fn-
1119 // shaped nodes (state / derived / dynamic / producer).
1120 if has_fired_once {
1121 self.binding
1122 .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1123 }
1124
1125 // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1126 // the FFI call only — Phase 3's lock-held state mutation is not part
1127 // of "currently firing" because set_deps would already block on the
1128 // state lock by then. Drop on the guard pops the stack even if
1129 // invoke_fn panics, keeping `currently_firing` balanced.
1130 let result = {
1131 let _firing = FiringGuard::new(self, node_id);
1132 self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1133 };
1134
1135 // Phase 3: apply result under the lock — defensive terminal check
1136 // (a sibling cascade may have terminated this node during phase 2).
1137 let action: FireAction = {
1138 let mut s = self.lock_state();
1139 // Defensive: node may have terminated mid-phase-2 via a sibling
1140 // cascade (a fn that re-entered `Core::error` on a path that
1141 // cascaded here). If so, release any payload handles and no-op.
1142 if s.require_node(node_id).terminal.is_some() {
1143 match &result {
1144 FnResult::Data { handle, .. } => {
1145 self.binding.release_handle(*handle);
1146 }
1147 FnResult::Batch { emissions, .. } => {
1148 for em in emissions {
1149 match em {
1150 FnEmission::Data(h) | FnEmission::Error(h) => {
1151 self.binding.release_handle(*h);
1152 }
1153 FnEmission::Complete => {}
1154 }
1155 }
1156 }
1157 FnResult::Noop { .. } => {}
1158 }
1159 return;
1160 }
1161 let rec = s.require_node_mut(node_id);
1162 rec.has_fired_once = true;
1163 if is_dynamic {
1164 let tracked = match &result {
1165 FnResult::Data { tracked, .. }
1166 | FnResult::Noop { tracked }
1167 | FnResult::Batch { tracked, .. } => tracked.clone(),
1168 };
1169 if let Some(t) = tracked {
1170 rec.tracked = t.into_iter().collect();
1171 }
1172 }
1173 match result {
1174 FnResult::Noop { .. } => {
1175 // Slice G: skip Resolved if a prior emission in the same
1176 // wave already queued tier-3 (would violate R1.3.3.a).
1177 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1178 // lives on per-thread WaveState. Borrow scoped to the
1179 // tier3 read so queue_notify (which re-borrows
1180 // WaveState) doesn't double-borrow.
1181 let already_dirty = s.require_node(node_id).dirty;
1182 let already_tier3 = with_wave_state(|ws| {
1183 ws.pending_notify
1184 .get(&node_id)
1185 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1186 });
1187 if already_dirty && !already_tier3 {
1188 self.queue_notify(&mut s, node_id, Message::Resolved);
1189 }
1190 FireAction::None
1191 }
1192 FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1193 FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1194 // Empty Batch is equivalent to Noop — settle with
1195 // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1196 // skip if a prior emission already queued tier-3.
1197 // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1198 // arm above for the WaveState borrow scope rationale.
1199 let already_dirty = s.require_node(node_id).dirty;
1200 let already_tier3 = with_wave_state(|ws| {
1201 ws.pending_notify
1202 .get(&node_id)
1203 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1204 });
1205 if already_dirty && !already_tier3 {
1206 self.queue_notify(&mut s, node_id, Message::Resolved);
1207 }
1208 FireAction::None
1209 }
1210 FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1211 }
1212 };
1213
1214 // Phase 4: commit emissions.
1215 match action {
1216 FireAction::None => {}
1217 // Single Data — equals substitution applies (R1.3.2).
1218 FireAction::SingleData(handle) => {
1219 self.commit_emission(node_id, handle);
1220 }
1221 // Batch — process in sequence. No equals substitution
1222 // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1223 FireAction::Batch(emissions) => {
1224 self.commit_batch(node_id, emissions);
1225 }
1226 }
1227 }
1228
1229 /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1230 /// through `commit_emission_verbatim` (no equals substitution per
1231 /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1232 /// cascade per R1.3.4; processing stops at the first terminal and
1233 /// remaining handles are released (R1.3.4.a: no further messages
1234 /// after terminal).
1235 fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1236 let mut iter = emissions.into_iter();
1237 for em in iter.by_ref() {
1238 match em {
1239 FnEmission::Data(handle) => {
1240 self.commit_emission_verbatim(node_id, handle);
1241 }
1242 FnEmission::Complete => {
1243 self.complete(node_id);
1244 break;
1245 }
1246 FnEmission::Error(handle) => {
1247 self.error(node_id, handle);
1248 break;
1249 }
1250 }
1251 }
1252 // Release handles from any emissions after the terminal break.
1253 for em in iter {
1254 match em {
1255 FnEmission::Data(h) | FnEmission::Error(h) => {
1256 self.binding.release_handle(h);
1257 }
1258 FnEmission::Complete => {}
1259 }
1260 }
1261 }
1262
1263 // -------------------------------------------------------------------
1264 // Emission commit — equals-substitution lives here
1265 // -------------------------------------------------------------------
1266
1267 /// Apply a node's emission. `&self`-only; brackets the equals check
1268 /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1269 /// Core safely.
1270 ///
1271 /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1272 /// equals_mode + old cache handle.
1273 ///
1274 /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1275 /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1276 /// crosses to the binding's `custom_equals` oracle, which may re-enter
1277 /// Core.
1278 ///
1279 /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1280 /// pending_notify (which snapshots subscribers on first touch),
1281 /// propagate to children.
1282 // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1283 // function is already a multi-phase wave-engine routine and breaking
1284 // out the four phases would obscure the lock-discipline.
1285 #[allow(clippy::too_many_lines)]
1286 pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1287 assert!(
1288 new_handle != NO_HANDLE,
1289 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1290 );
1291
1292 // Phase 1: terminal short-circuit + snapshot equals/cache.
1293 let snapshot = {
1294 let s = self.lock_state();
1295 let rec = s.require_node(node_id);
1296 if rec.terminal.is_some() {
1297 drop(s);
1298 self.binding.release_handle(new_handle);
1299 return;
1300 }
1301 (rec.cache, rec.equals)
1302 };
1303 let (old_handle, equals_mode) = snapshot;
1304
1305 // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1306 // fires for SINGLE-DATA waves at one node. Detect "this is a
1307 // subsequent emit in the same wave at this node" via the
1308 // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1309 // (D1 patch, 2026-05-09 — moved off per-partition state to be
1310 // robust against mid-wave cross-thread `set_deps` partition
1311 // splits). If set → multi-emit wave: skip equals, queue Data
1312 // verbatim, retroactively rewrite any prior Resolved (queued by
1313 // an earlier same-value emit's equals match) to Data using the
1314 // wave-start cache snapshot. Outside batch / first emit:
1315 // standard per-emit equals path. Thread-local lookup is
1316 // ~5ns and lock-free.
1317 let is_subsequent_emit_in_wave = tier3_check(node_id);
1318
1319 if is_subsequent_emit_in_wave {
1320 // Multi-emit wave detected. Skip equals, queue Data verbatim.
1321 // Also rewrite any prior Resolved entries to Data using the
1322 // wave-start cache snapshot.
1323 self.rewrite_prior_resolved_to_data(node_id);
1324 self.commit_emission_verbatim(node_id, new_handle);
1325 return;
1326 }
1327
1328 // Phase 2: equals check (lock-released for Custom).
1329 let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1330
1331 // Phase 3: apply emission under the lock. Defensive terminal
1332 // re-check — a concurrent cascade between phase 2 and phase 3
1333 // could have terminated the node.
1334 let mut s = self.lock_state();
1335 if s.require_node(node_id).terminal.is_some() {
1336 drop(s);
1337 self.binding.release_handle(new_handle);
1338 return;
1339 }
1340
1341 // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1342 // dirty from an earlier emission in the same wave.
1343 let already_dirty = s.require_node(node_id).dirty;
1344 s.require_node_mut(node_id).dirty = true;
1345 if !already_dirty {
1346 self.queue_notify(&mut s, node_id, Message::Dirty);
1347 }
1348
1349 if is_data {
1350 // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1351 // re-entry from a `custom_equals` oracle that called back into
1352 // `Core::emit` on this same node during phase 2's lock-released
1353 // equals check could have advanced the cache between phase 1's
1354 // snapshot (`old_handle`) and this point.
1355 let current_cache = s.require_node(node_id).cache;
1356 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1357 // lives on per-thread WaveState. `in_tick` is per-(Core,
1358 // thread) (`IN_TICK_OWNED`); this read is on the wave-owner
1359 // thread, so it observes this thread's own ownership.
1360 let in_tick = self.in_tick();
1361 let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1362 use std::collections::hash_map::Entry;
1363 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1364 Entry::Vacant(slot) => {
1365 slot.insert(current_cache);
1366 true
1367 }
1368 Entry::Occupied(_) => false,
1369 })
1370 } else {
1371 false
1372 };
1373 s.require_node_mut(node_id).cache = new_handle;
1374 if current_cache != NO_HANDLE && !snapshot_taken {
1375 self.binding.release_handle(current_cache);
1376 }
1377 // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1378 // buffer if the node opted in. RESOLVED entries are NOT
1379 // buffered (canonical "DATA only").
1380 self.push_replay_buffer(&mut s, node_id, new_handle);
1381 // Slice G (D1 patch, 2026-05-09): mark this node as having
1382 // emitted tier-3 in this wave on the per-thread tracker.
1383 tier3_mark(node_id);
1384 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1385 // Propagate to children
1386 let child_ids: Vec<NodeId> = s
1387 .children
1388 .get(&node_id)
1389 .map(|c| c.iter().copied().collect())
1390 .unwrap_or_default();
1391 for child_id in child_ids {
1392 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1393 if let Some(idx) = dep_idx {
1394 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1395 }
1396 }
1397 } else {
1398 // RESOLVED: handle unchanged. Don't release; old still in use.
1399 // Slice G: snapshot cache so a subsequent same-wave emit can
1400 // rewrite this Resolved to Data using the snapshot.
1401 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1402 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1403 // `in_tick` is per-(Core, thread); read on the wave-owner
1404 // thread (observes this thread's own ownership).
1405 let current_cache = s.require_node(node_id).cache;
1406 if self.in_tick() && current_cache != NO_HANDLE {
1407 use std::collections::hash_map::Entry;
1408 with_wave_state(|ws| {
1409 if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1410 self.binding.retain_handle(current_cache);
1411 slot.insert(current_cache);
1412 }
1413 });
1414 }
1415 // Slice G (D1 patch, 2026-05-09): mark this node as having
1416 // emitted tier-3 in this wave on the per-thread tracker.
1417 tier3_mark(node_id);
1418 self.queue_notify(&mut s, node_id, Message::Resolved);
1419 let child_ids: Vec<NodeId> = s
1420 .children
1421 .get(&node_id)
1422 .map(|c| c.iter().copied().collect())
1423 .unwrap_or_default();
1424 // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1425 // during the loop and bulk-insert into pending_auto_resolve
1426 // under a SINGLE cross_partition acquire after the loop.
1427 // Pre-fix the loop acquired `cross_partition` once per
1428 // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1429 // which is N mutex hops for an N-child cascade. Cannot
1430 // hoist to acquire-cps-before-loop because `queue_notify`
1431 // (called inside the loop) also acquires cross_partition
1432 // for `pending_pause_overflow.push` in the rare overflow
1433 // case — re-entrance on the non-reentrant Mutex would
1434 // self-deadlock.
1435 let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1436 for child_id in child_ids {
1437 let already_involved = s.require_node(child_id).involved_this_wave;
1438 if !already_involved {
1439 {
1440 let child = s.require_node_mut(child_id);
1441 child.involved_this_wave = true;
1442 child.dirty = true;
1443 }
1444 self.queue_notify(&mut s, child_id, Message::Dirty);
1445 // Q2 (2026-05-09): pending_auto_resolve lives on
1446 // CrossPartitionState. Deferred to after-loop
1447 // bulk insert per the /qa A7 fix above.
1448 auto_resolve_inserts.push(child_id);
1449 }
1450 }
1451 // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1452 // single WaveState borrow for the bulk-insert. queue_notify
1453 // above no longer holds the WaveState borrow by the time we
1454 // reach here, so this borrow is uncontested.
1455 if !auto_resolve_inserts.is_empty() {
1456 with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1457 }
1458 }
1459 }
1460
1461 /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1462 /// emit arrives while a prior tier-3 message is still pending), rewrite
1463 /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1464 /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1465 /// Touches both `pending_notify` (immediate-flush path) and the per-node
1466 /// pause buffer (paused path).
1467 fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1468 let mut s = self.lock_state();
1469 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1470 // and pending_notify both live on per-thread WaveState. Single
1471 // WaveState borrow handles both the snapshot lookup and the
1472 // pending_notify rewrite; the pause-buffer path uses the state
1473 // lock and is independent of WaveState.
1474 let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1475 Some(h) if h != NO_HANDLE => h,
1476 // No snapshot available — the prior Resolved was queued without
1477 // a cache (sentinel pre-emit). Nothing to rewrite to; the
1478 // multi-emit case from sentinel is fine (verbatim Data path).
1479 _ => return,
1480 };
1481 let mut retains_needed = 0u32;
1482 // Pending_notify path. Walk all batches' messages — Slice-G
1483 // coalescing reasons about wave-content per node, not per-batch.
1484 with_wave_state(|ws| {
1485 if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1486 for msg in entry.iter_messages_mut() {
1487 if matches!(msg, Message::Resolved) {
1488 *msg = Message::Data(snapshot);
1489 retains_needed += 1;
1490 }
1491 }
1492 }
1493 });
1494 // Pause-buffer path.
1495 if let Some(rec) = s.nodes.get_mut(&node_id) {
1496 if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1497 for msg in &mut *buffer {
1498 if matches!(msg, Message::Resolved) {
1499 *msg = Message::Data(snapshot);
1500 retains_needed += 1;
1501 }
1502 }
1503 }
1504 }
1505 drop(s);
1506 // Each rewritten Resolved → Data adds a payload retain that
1507 // queue_notify would otherwise have taken at emit time. The
1508 // snapshot already owns one retain (taken when cache was
1509 // snapshotted); we need one fresh retain per rewrite.
1510 for _ in 0..retains_needed {
1511 self.binding.retain_handle(snapshot);
1512 }
1513 }
1514
1515 /// Equals check that crosses the binding boundary lock-released for
1516 /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1517 fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1518 if a == b {
1519 return true; // identity-on-handles always sufficient
1520 }
1521 if a == NO_HANDLE || b == NO_HANDLE {
1522 return false;
1523 }
1524 match mode {
1525 EqualsMode::Identity => false,
1526 EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1527 }
1528 }
1529
1530 /// Commit a DATA emission **without** equals substitution — used by
1531 /// `FnResult::Batch` processing where multi-message waves pass through
1532 /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1533 /// R1.3.1.a condition (b): only queued if node not already dirty.
1534 ///
1535 /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1536 /// but skips the Phase 2 equals check entirely.
1537 fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1538 assert!(
1539 new_handle != NO_HANDLE,
1540 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1541 );
1542
1543 let mut s = self.lock_state();
1544 let rec = s.require_node(node_id);
1545 if rec.terminal.is_some() {
1546 drop(s);
1547 self.binding.release_handle(new_handle);
1548 return;
1549 }
1550
1551 // R1.3.1.a condition (b): DIRTY only if not already dirty.
1552 let already_dirty = s.require_node(node_id).dirty;
1553 s.require_node_mut(node_id).dirty = true;
1554 if !already_dirty {
1555 self.queue_notify(&mut s, node_id, Message::Dirty);
1556 }
1557
1558 // Always DATA — no equals substitution for Batch emissions.
1559 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1560 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1561 // `in_tick` is per-(Core, thread); read on the wave-owner thread.
1562 let current_cache = s.require_node(node_id).cache;
1563 let snapshot_taken = if self.in_tick() && current_cache != NO_HANDLE {
1564 use std::collections::hash_map::Entry;
1565 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1566 Entry::Vacant(slot) => {
1567 slot.insert(current_cache);
1568 true
1569 }
1570 Entry::Occupied(_) => false,
1571 })
1572 } else {
1573 false
1574 };
1575 s.require_node_mut(node_id).cache = new_handle;
1576 if current_cache != NO_HANDLE && !snapshot_taken {
1577 self.binding.release_handle(current_cache);
1578 }
1579 // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1580 self.push_replay_buffer(&mut s, node_id, new_handle);
1581 // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1582 // tier3_emitted_this_wave on the per-thread tracker even on the
1583 // verbatim path. A subsequent commit_emission at the same node
1584 // in the same wave needs this flag to detect multi-emit and
1585 // skip equals substitution; without it, a Batch-then-standard
1586 // sequence would queue Resolved into a wave that already has
1587 // Data — violating R1.3.3.a. The Batch path itself still
1588 // passes verbatim per R1.3.3.c (we don't re-run equals here);
1589 // we just record that "this node has emitted tier-3 in this
1590 // wave."
1591 tier3_mark(node_id);
1592 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1593 // Propagate to children
1594 let child_ids: Vec<NodeId> = s
1595 .children
1596 .get(&node_id)
1597 .map(|c| c.iter().copied().collect())
1598 .unwrap_or_default();
1599 for child_id in child_ids {
1600 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1601 if let Some(idx) = dep_idx {
1602 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1603 }
1604 }
1605 }
1606
1607 /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1608 /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1609 /// fresh retain on push. RESOLVED is NOT buffered per canonical
1610 /// "DATA only" — call sites only invoke this for Data emissions.
1611 ///
1612 /// Evicted handle is queued into `cps.deferred_handle_releases`
1613 /// (released lock-released at flush time) per the binding-boundary
1614 /// lock-release discipline — `release_handle` may re-enter Core via
1615 /// finalizers and must not run while the state lock is held
1616 /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1617 /// CrossPartitionState; this fn acquires `cross_partition` only
1618 /// when an eviction actually happens (the common case is no
1619 /// eviction → no second-mutex acquire).
1620 fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1621 let rec = s.require_node_mut(node_id);
1622 let cap = match rec.replay_buffer_cap {
1623 Some(c) if c > 0 => c,
1624 _ => return,
1625 };
1626 self.binding.retain_handle(new_handle);
1627 rec.replay_buffer.push_back(new_handle);
1628 let evicted = if rec.replay_buffer.len() > cap {
1629 rec.replay_buffer.pop_front()
1630 } else {
1631 None
1632 };
1633 if let Some(h) = evicted {
1634 with_wave_state(|ws| ws.deferred_handle_releases.push(h));
1635 }
1636 }
1637
1638 // ===================================================================
1639 // Operator dispatch (Slice C-1, D009).
1640 //
1641 // `fire_operator` is the entry point for nodes whose `kind` is
1642 // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1643 // to per-operator helpers that snapshot inputs under the lock, drop the
1644 // lock to call the binding's bulk projection FFI, and reacquire to
1645 // apply emissions via `commit_emission_verbatim` (no per-item equals
1646 // dedup at the wire — operator output passes verbatim per the same
1647 // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1648 //
1649 // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1650 // share retains owned by the wave's data-batch slot (released at
1651 // wave-end rotation in `clear_wave_state`). Operators that emit those
1652 // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1653 // `prev` carry-over) take an additional retain via `retain_handle`
1654 // before passing to `commit_emission_verbatim` — the cache slot owns
1655 // its own share, independent of the data-batch slot's. Operators that
1656 // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1657 // packed tuples) receive retains pre-bumped by the binding's bulk-
1658 // projection method.
1659 // ===================================================================
1660
1661 /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1662 /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1663 /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1664 fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1665 // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1666 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1667 // per-thread WaveState; state lock + WaveState borrow are
1668 // independent.
1669 let proceed = {
1670 let s = self.lock_state();
1671 with_wave_state(|ws| {
1672 ws.pending_fires.remove(&node_id);
1673 });
1674 let rec = s.require_node(node_id);
1675 if rec.terminal.is_some() {
1676 false
1677 } else {
1678 // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1679 // (D011) opt out of the gate; otherwise we wait for every
1680 // dep to have delivered at least one real handle. Terminal-
1681 // aware operators (currently `Reduce`) additionally count a
1682 // dep terminal as "real input" so they can fire on
1683 // upstream COMPLETE-without-DATA and emit the seed.
1684 let has_real_input = !rec.has_sentinel_deps()
1685 || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1686 rec.partial || has_real_input
1687 }
1688 };
1689 if !proceed {
1690 return;
1691 }
1692
1693 // A6 (Slice F, 2026-05-07): track operator fire on the
1694 // `currently_firing` stack so a binding-side project/predicate/fold
1695 // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1696 // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1697 // the stack on panic too.
1698 let _firing = FiringGuard::new(self, node_id);
1699
1700 match op {
1701 OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1702 OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1703 OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1704 OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1705 OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1706 self.fire_op_distinct(node_id, equals_fn_id);
1707 }
1708 OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1709 OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1710 OperatorOp::WithLatestFrom { pack_fn } => {
1711 self.fire_op_with_latest_from(node_id, pack_fn);
1712 }
1713 OperatorOp::Merge => self.fire_op_merge(node_id),
1714 OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1715 OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1716 OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1717 // The variant carries `default` for `register_operator`'s
1718 // `make_op_scratch` path; once registered, the live default
1719 // is read from `LastState::default` inside `fire_op_last`.
1720 OperatorOp::Last { .. } => self.fire_op_last(node_id),
1721 OperatorOp::Tap { fn_id } => self.fire_op_tap(node_id, fn_id),
1722 OperatorOp::TapFirst { fn_id } => self.fire_op_tap_first(node_id, fn_id),
1723 OperatorOp::Valve => self.fire_op_valve(node_id),
1724 OperatorOp::Settle {
1725 quiet_waves,
1726 max_waves,
1727 } => self.fire_op_settle(node_id, quiet_waves, max_waves),
1728 }
1729 }
1730
1731 /// Snapshot the operator's single dep batch (transform constraint —
1732 /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1733 /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1734 /// `dep_records[0].terminal` at snapshot time.
1735 fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1736 let s = self.lock_state();
1737 let rec = s.require_node(node_id);
1738 debug_assert!(
1739 !rec.dep_records.is_empty(),
1740 "transform operator must have ≥1 dep"
1741 );
1742 let dr = &rec.dep_records[0];
1743 (dr.data_batch.iter().copied().collect(), dr.terminal)
1744 }
1745
1746 /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1747 /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1748 /// when a wave's inputs all suppress and the operator needs to settle
1749 /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1750 /// on full-reject).
1751 fn settle_dirty_resolved(&self, node_id: NodeId) {
1752 let mut s = self.lock_state();
1753 if s.require_node(node_id).terminal.is_some() {
1754 return;
1755 }
1756 let already_dirty = s.require_node(node_id).dirty;
1757 s.require_node_mut(node_id).dirty = true;
1758 if !already_dirty {
1759 self.queue_notify(&mut s, node_id, Message::Dirty);
1760 }
1761 // Slice G: skip Resolved if pending_notify already has a tier-3
1762 // message — adding Resolved would violate R1.3.3.a.
1763 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
1764 // on per-thread WaveState; borrow scoped so queue_notify can
1765 // re-borrow.
1766 let already_tier3 = with_wave_state(|ws| {
1767 ws.pending_notify
1768 .get(&node_id)
1769 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1770 });
1771 if !already_tier3 {
1772 self.queue_notify(&mut s, node_id, Message::Resolved);
1773 }
1774 }
1775
1776 /// `OperatorOp::Map` dispatch.
1777 fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1778 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1779 // Mark fired regardless of input count (activation gate already
1780 // satisfied or partial-mode).
1781 {
1782 let mut s = self.lock_state();
1783 s.require_node_mut(node_id).has_fired_once = true;
1784 }
1785 if inputs.is_empty() {
1786 return;
1787 }
1788 // Phase 2 (lock-released): bulk project. Binding returns one
1789 // handle per input, each with a retain share already taken.
1790 let outputs = self.binding.project_each(fn_id, &inputs);
1791 // Phase 3: emit each output. `commit_emission_verbatim` consumes
1792 // the retain into the cache slot (and releases the prior cache
1793 // handle internally).
1794 for h in outputs {
1795 self.commit_emission_verbatim(node_id, h);
1796 }
1797 }
1798
1799 /// `OperatorOp::Filter` dispatch (D012/D018).
1800 fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1801 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1802 {
1803 let mut s = self.lock_state();
1804 s.require_node_mut(node_id).has_fired_once = true;
1805 }
1806 if inputs.is_empty() {
1807 return;
1808 }
1809 // Phase 2: predicate per input.
1810 let pass = self.binding.predicate_each(fn_id, &inputs);
1811 // Slice V2: promoted from debug_assert! — binding contract violation
1812 // should fail loud in release builds too.
1813 assert!(
1814 pass.len() == inputs.len(),
1815 "predicate_each returned {} bools for {} inputs",
1816 pass.len(),
1817 inputs.len()
1818 );
1819 // Phase 3: emit passing items verbatim. Take a fresh retain for
1820 // each — the data_batch slot still owns its retain (released at
1821 // wave-end rotation), and the cache slot needs its own.
1822 let mut emitted = 0usize;
1823 for (i, &h) in inputs.iter().enumerate() {
1824 if pass.get(i).copied().unwrap_or(false) {
1825 self.binding.retain_handle(h);
1826 self.commit_emission_verbatim(node_id, h);
1827 emitted += 1;
1828 }
1829 }
1830 // D018: full-reject settles with DIRTY+RESOLVED.
1831 if emitted == 0 {
1832 self.settle_dirty_resolved(node_id);
1833 }
1834 }
1835
1836 /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1837 fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1838 use crate::op_state::ScanState;
1839 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1840 let acc = {
1841 let s = self.lock_state();
1842 scratch_ref::<ScanState>(&s, node_id).acc
1843 };
1844 {
1845 let mut s = self.lock_state();
1846 s.require_node_mut(node_id).has_fired_once = true;
1847 }
1848 if inputs.is_empty() {
1849 return;
1850 }
1851 // Phase 2: fold each input through. Returns N new handles, each
1852 // with a fresh retain.
1853 let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1854 // Slice V2: promoted from debug_assert! — binding contract violation.
1855 assert!(
1856 new_states.len() == inputs.len(),
1857 "fold_each returned {} accs for {} inputs",
1858 new_states.len(),
1859 inputs.len()
1860 );
1861 // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1862 // extra retain for the slot; release the prior acc's slot retain.
1863 let last_acc = new_states.last().copied();
1864 if let Some(last) = last_acc {
1865 let prev_acc = {
1866 let mut s = self.lock_state();
1867 let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1868 let prev = scratch.acc;
1869 scratch.acc = last;
1870 prev
1871 };
1872 // Take the slot's retain on the new acc.
1873 self.binding.retain_handle(last);
1874 // Release the prior slot's retain (post-lock to keep binding
1875 // free to re-enter Core safely).
1876 if prev_acc != crate::handle::NO_HANDLE {
1877 self.binding.release_handle(prev_acc);
1878 }
1879 }
1880 // Phase 3b: emit each intermediate acc verbatim. `new_states`
1881 // entries each carry one retain from `fold_each`; that retain is
1882 // consumed by `commit_emission_verbatim` into the cache slot.
1883 for h in new_states {
1884 self.commit_emission_verbatim(node_id, h);
1885 }
1886 }
1887
1888 /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1889 /// upstream COMPLETE (cascades ERROR verbatim).
1890 fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1891 use crate::op_state::ReduceState;
1892 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1893 let acc = {
1894 let s = self.lock_state();
1895 scratch_ref::<ReduceState>(&s, node_id).acc
1896 };
1897 {
1898 let mut s = self.lock_state();
1899 s.require_node_mut(node_id).has_fired_once = true;
1900 }
1901 // Phase 2: accumulate (silent — no per-input emit).
1902 let new_states = if inputs.is_empty() {
1903 SmallVec::<[HandleId; 1]>::new()
1904 } else {
1905 self.binding.fold_each(fn_id, acc, &inputs)
1906 };
1907 // Slice V2: promoted from debug_assert! — binding contract violation.
1908 assert!(
1909 new_states.len() == inputs.len(),
1910 "fold_each returned {} accs for {} inputs",
1911 new_states.len(),
1912 inputs.len()
1913 );
1914 // Update ReduceState.acc to last new acc; release intermediate
1915 // states (we don't emit them) and the prior acc's slot retain.
1916 let last_acc = new_states.last().copied();
1917 let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1918 new_states[..new_states.len() - 1].to_vec()
1919 } else {
1920 Vec::new()
1921 };
1922 let prev_acc_to_release = if let Some(last) = last_acc {
1923 let prev_acc = {
1924 let mut s = self.lock_state();
1925 let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1926 let prev = scratch.acc;
1927 scratch.acc = last;
1928 prev
1929 };
1930 self.binding.retain_handle(last);
1931 if prev_acc == crate::handle::NO_HANDLE {
1932 None
1933 } else {
1934 Some(prev_acc)
1935 }
1936 } else {
1937 None
1938 };
1939 // Release intermediate fold results (Reduce only emits the LAST,
1940 // but only on terminal). Each was retained by `fold_each`.
1941 for h in intermediates_to_release {
1942 self.binding.release_handle(h);
1943 }
1944 if let Some(h) = prev_acc_to_release {
1945 self.binding.release_handle(h);
1946 }
1947
1948 // Phase 3: emit on terminal.
1949 match terminal {
1950 None => {
1951 // Still accumulating; no emit. Subscribers see no message
1952 // for this wave (silent accumulation). The first wave that
1953 // pushes Reduce to fire produces a Dirty entry on the
1954 // upstream's commit, but Reduce itself doesn't queue any
1955 // tier-3 since R5 silently absorbs. v1: leave the
1956 // post-drain auto-resolve sweep to settle nothing —
1957 // pending_notify has no entry for Reduce so the sweep is
1958 // a no-op.
1959 }
1960 Some(TerminalKind::Complete) => {
1961 // Read the live acc (may be the seed if no DATA arrived)
1962 // and emit Data(acc) + Complete.
1963 let final_acc = {
1964 let s = self.lock_state();
1965 scratch_ref::<ReduceState>(&s, node_id).acc
1966 };
1967 if final_acc != crate::handle::NO_HANDLE {
1968 // Emission needs its own retain (slot's retain is
1969 // owned by ReduceState.acc until reset/Drop).
1970 self.binding.retain_handle(final_acc);
1971 self.commit_emission_verbatim(node_id, final_acc);
1972 }
1973 self.complete(node_id);
1974 }
1975 Some(TerminalKind::Error(h)) => {
1976 // Core::error transfers the caller's share into the
1977 // cascade (node.terminal + per-child dep_terminal slots);
1978 // no release at the error() boundary. Take a fresh share
1979 // here so the cascade owns it independently of the
1980 // dep_records[0].terminal slot's share.
1981 self.binding.retain_handle(h);
1982 self.error(node_id, h);
1983 }
1984 }
1985 }
1986
1987 /// `OperatorOp::DistinctUntilChanged` dispatch.
1988 fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1989 use crate::op_state::DistinctState;
1990 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1991 let mut prev = {
1992 let s = self.lock_state();
1993 scratch_ref::<DistinctState>(&s, node_id).prev
1994 };
1995 {
1996 let mut s = self.lock_state();
1997 s.require_node_mut(node_id).has_fired_once = true;
1998 }
1999 if inputs.is_empty() {
2000 return;
2001 }
2002 // Take a working-copy retain on the initial prev so both the loop
2003 // (which releases old_prev on each non-equal item) and phase 3
2004 // (which releases the slot's original handle) each have their own
2005 // share. Without this, the loop's release of old_prev (== original
2006 // DistinctState.prev) double-releases against phase 3's stale_slot
2007 // release.
2008 if prev != crate::handle::NO_HANDLE {
2009 self.binding.retain_handle(prev);
2010 }
2011 // Phase 2: per-input equals(prev, current). Each non-equal input
2012 // is emitted and becomes the new prev. Equals fn_id reuses
2013 // `BindingBoundary::custom_equals`.
2014 let mut emitted = 0usize;
2015 for &h in &inputs {
2016 let equal = if prev == crate::handle::NO_HANDLE {
2017 false
2018 } else if prev == h {
2019 true
2020 } else {
2021 self.binding.custom_equals(equals_fn_id, prev, h)
2022 };
2023 if !equal {
2024 // Emit this input verbatim.
2025 self.binding.retain_handle(h);
2026 self.commit_emission_verbatim(node_id, h);
2027 // Update prev: take retain on new prev, release old
2028 // (working-copy retain from above or from prior iteration).
2029 self.binding.retain_handle(h);
2030 let old_prev = prev;
2031 prev = h;
2032 if old_prev != crate::handle::NO_HANDLE {
2033 self.binding.release_handle(old_prev);
2034 }
2035 emitted += 1;
2036 }
2037 }
2038 // Phase 3: persist prev into DistinctState.prev slot. Release the
2039 // slot's original retain (stale_slot) — this is the slot-owned
2040 // share, independent of the working-copy share released in the
2041 // loop above.
2042 {
2043 let mut s = self.lock_state();
2044 let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2045 let stale_slot = scratch.prev;
2046 scratch.prev = prev;
2047 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2048 drop(s);
2049 self.binding.release_handle(stale_slot);
2050 }
2051 }
2052 // Release the working-copy retain on the final prev if it was
2053 // never released in the loop (i.e. no non-equal items passed,
2054 // prev == original). In that case stale_slot == prev, so phase 3
2055 // didn't release it either — but the working-copy retain is still
2056 // outstanding. Release it now.
2057 if emitted == 0 && prev != crate::handle::NO_HANDLE {
2058 self.binding.release_handle(prev);
2059 }
2060 if emitted == 0 {
2061 self.settle_dirty_resolved(node_id);
2062 }
2063 }
2064
2065 /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2066 /// starting after the second value (first input swallowed, sets `prev`).
2067 fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2068 use crate::op_state::PairwiseState;
2069 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2070 let mut prev = {
2071 let s = self.lock_state();
2072 scratch_ref::<PairwiseState>(&s, node_id).prev
2073 };
2074 {
2075 let mut s = self.lock_state();
2076 s.require_node_mut(node_id).has_fired_once = true;
2077 }
2078 if inputs.is_empty() {
2079 return;
2080 }
2081 let mut emitted = 0usize;
2082 for &h in &inputs {
2083 if prev == crate::handle::NO_HANDLE {
2084 // First-ever value — swallow, set prev. Retain for the
2085 // PairwiseState.prev slot (persisted in phase 3 below).
2086 self.binding.retain_handle(h);
2087 prev = h;
2088 continue;
2089 }
2090 // Pack (prev, current) into a tuple handle. Binding returns a
2091 // fresh retain on the packed handle.
2092 let packed = self.binding.pairwise_pack(fn_id, prev, h);
2093 self.commit_emission_verbatim(node_id, packed);
2094 // Advance prev: take retain on h, release old prev.
2095 self.binding.retain_handle(h);
2096 let old_prev = prev;
2097 prev = h;
2098 self.binding.release_handle(old_prev);
2099 emitted += 1;
2100 }
2101 // Persist prev into PairwiseState.prev slot.
2102 {
2103 let mut s = self.lock_state();
2104 let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2105 let stale_slot = scratch.prev;
2106 scratch.prev = prev;
2107 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2108 drop(s);
2109 self.binding.release_handle(stale_slot);
2110 }
2111 }
2112 if emitted == 0 {
2113 self.settle_dirty_resolved(node_id);
2114 }
2115 }
2116
2117 // =================================================================
2118 // Slice C-2: multi-dep combinator operators (D020)
2119 // =================================================================
2120
2121 /// Snapshot all deps' "latest" handle for multi-dep combinators.
2122 /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2123 /// this wave), else `prev_data` (last handle from previous wave).
2124 /// Also returns whether dep[0] (primary) had DATA this wave —
2125 /// needed by `fire_op_with_latest_from`.
2126 fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2127 let s = self.lock_state();
2128 let rec = s.require_node(node_id);
2129 let primary_fired = rec
2130 .dep_records
2131 .first()
2132 .is_some_and(|dr| !dr.data_batch.is_empty());
2133 let latest: SmallVec<[HandleId; 4]> = rec
2134 .dep_records
2135 .iter()
2136 .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2137 .collect();
2138 (latest, primary_fired)
2139 }
2140
2141 /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2142 /// latest handle per dep into a tuple via `pack_tuple`, emits on
2143 /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2144 /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2145 /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2146 /// instead of packing a NO_HANDLE into the tuple.
2147 fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2148 let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2149 {
2150 let mut s = self.lock_state();
2151 s.require_node_mut(node_id).has_fired_once = true;
2152 }
2153 // Post-warmup INVALIDATE guard: a dep may have been invalidated
2154 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2155 if latest.contains(&crate::handle::NO_HANDLE) {
2156 self.settle_dirty_resolved(node_id);
2157 return;
2158 }
2159 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2160 self.commit_emission_verbatim(node_id, tuple_handle);
2161 }
2162
2163 /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2164 /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2165 /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2166 /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2167 /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2168 fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2169 let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2170 let first_fire = {
2171 let mut s = self.lock_state();
2172 let rec = s.require_node_mut(node_id);
2173 let was_first = !rec.has_fired_once;
2174 rec.has_fired_once = true;
2175 was_first
2176 };
2177 // On first fire (gate release), always emit — the first-run gate
2178 // guarantees both deps have values (via prev_data fallback in
2179 // snapshot). On subsequent fires, only emit when primary fires.
2180 if !first_fire && !primary_fired {
2181 // Secondary-only update — no downstream DATA.
2182 self.settle_dirty_resolved(node_id);
2183 return;
2184 }
2185 // Post-warmup INVALIDATE guard: secondary may have been invalidated
2186 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2187 debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2188 if latest[1] == crate::handle::NO_HANDLE {
2189 self.settle_dirty_resolved(node_id);
2190 return;
2191 }
2192 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2193 self.commit_emission_verbatim(node_id, tuple_handle);
2194 }
2195
2196 /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2197 /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2198 /// batch handles are collected, retained, and emitted individually.
2199 fn fire_op_merge(&self, node_id: NodeId) {
2200 // Collect all batch handles from all deps (flat).
2201 let all_handles: Vec<HandleId> = {
2202 let s = self.lock_state();
2203 let rec = s.require_node(node_id);
2204 rec.dep_records
2205 .iter()
2206 .flat_map(|dr| dr.data_batch.iter().copied())
2207 .collect()
2208 };
2209 {
2210 let mut s = self.lock_state();
2211 s.require_node_mut(node_id).has_fired_once = true;
2212 }
2213 if all_handles.is_empty() {
2214 // All deps settled RESOLVED this wave — no DATA to forward.
2215 self.settle_dirty_resolved(node_id);
2216 return;
2217 }
2218 // Emit each handle verbatim. Take a fresh retain per handle
2219 // (independent of the dep batch's retain which gets released at
2220 // wave-end). Matches Filter's discipline for passing inputs.
2221 for &h in &all_handles {
2222 self.binding.retain_handle(h);
2223 self.commit_emission_verbatim(node_id, h);
2224 }
2225 }
2226
2227 // =================================================================
2228 // Slice C-3: flow operators (D024)
2229 // =================================================================
2230
2231 /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2232 /// then self-completes via `Core::complete`. When `count == 0`, the
2233 /// first fire emits zero items then immediately self-completes
2234 /// (D027). Cross-wave counter lives in
2235 /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2236 fn fire_op_take(&self, node_id: NodeId, count: u32) {
2237 use crate::op_state::TakeState;
2238 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2239 // Snapshot current counter; mark fired regardless of input count
2240 // (activation gate already satisfied or partial-mode).
2241 let mut count_emitted = {
2242 let s = self.lock_state();
2243 scratch_ref::<TakeState>(&s, node_id).count_emitted
2244 };
2245 {
2246 let mut s = self.lock_state();
2247 s.require_node_mut(node_id).has_fired_once = true;
2248 }
2249 // Already at quota before any input this wave — self-complete
2250 // immediately. Covers `count == 0` (first-fire short-circuit) and
2251 // any defensive re-entry after the terminal-skip in `fire_operator`
2252 // already guards against double-complete.
2253 if count_emitted >= count {
2254 self.complete(node_id);
2255 return;
2256 }
2257 // Per-input emission loop. Each pass takes a fresh retain for the
2258 // cache slot; data_batch slot's retain is released at wave-end
2259 // rotation independently.
2260 for &h in &inputs {
2261 self.binding.retain_handle(h);
2262 self.commit_emission_verbatim(node_id, h);
2263 count_emitted = count_emitted.saturating_add(1);
2264 if count_emitted >= count {
2265 break;
2266 }
2267 }
2268 // Persist the updated counter.
2269 {
2270 let mut s = self.lock_state();
2271 scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2272 }
2273 // Self-complete if we hit the quota this wave. Upstream COMPLETE
2274 // (terminal == Some(Complete)) without us hitting the count
2275 // propagates via the standard auto-cascade — we don't intercept it.
2276 if count_emitted >= count {
2277 self.complete(node_id);
2278 return;
2279 }
2280 // If upstream is already Errored and we haven't hit count, the
2281 // standard cascade will propagate it. If the wave delivered no
2282 // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2283 // subscribers see the wave close.
2284 if inputs.is_empty() && terminal.is_none() {
2285 self.settle_dirty_resolved(node_id);
2286 }
2287 }
2288
2289 /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2290 /// then forwards the rest. Cross-wave counter lives in
2291 /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2292 /// On a wave where every input is still in the skip window, settles
2293 /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2294 fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2295 use crate::op_state::SkipState;
2296 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2297 let mut count_skipped = {
2298 let s = self.lock_state();
2299 scratch_ref::<SkipState>(&s, node_id).count_skipped
2300 };
2301 {
2302 let mut s = self.lock_state();
2303 s.require_node_mut(node_id).has_fired_once = true;
2304 }
2305 // No early-return on empty inputs: the post-loop `emitted == 0`
2306 // settle handles the empty-inputs case identically to the
2307 // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2308 // with `fire_op_take`).
2309 let mut emitted = 0usize;
2310 for &h in &inputs {
2311 if count_skipped < count {
2312 count_skipped = count_skipped.saturating_add(1);
2313 // Drop this input — the data_batch slot still owns its
2314 // retain (released at wave-end rotation). No emission.
2315 continue;
2316 }
2317 // Past the skip window — emit verbatim. Take a fresh retain
2318 // for the cache slot.
2319 self.binding.retain_handle(h);
2320 self.commit_emission_verbatim(node_id, h);
2321 emitted += 1;
2322 }
2323 // Persist the updated counter.
2324 {
2325 let mut s = self.lock_state();
2326 scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2327 }
2328 if emitted == 0 {
2329 self.settle_dirty_resolved(node_id);
2330 }
2331 }
2332
2333 /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2334 /// holds; on the first `false`, emits any preceding passes from the
2335 /// same batch then self-completes via `Core::complete`. Reuses
2336 /// [`BindingBoundary::predicate_each`] (D029).
2337 fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2338 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2339 {
2340 let mut s = self.lock_state();
2341 s.require_node_mut(node_id).has_fired_once = true;
2342 }
2343 if inputs.is_empty() {
2344 return;
2345 }
2346 // Phase 2: predicate per input.
2347 let pass = self.binding.predicate_each(fn_id, &inputs);
2348 // Slice V2: promoted from debug_assert! — binding contract violation
2349 // should fail loud in release builds too.
2350 assert!(
2351 pass.len() == inputs.len(),
2352 "predicate_each returned {} bools for {} inputs",
2353 pass.len(),
2354 inputs.len()
2355 );
2356 // Phase 3: emit each input until the first false; then
2357 // self-complete. `fire_operator`'s `terminal.is_some()`
2358 // short-circuit gates re-entry after the self-complete cascade
2359 // installs the terminal slot — no extra `done` flag needed.
2360 let mut emitted = 0usize;
2361 let mut first_false_seen = false;
2362 for (i, &h) in inputs.iter().enumerate() {
2363 if pass.get(i).copied().unwrap_or(false) {
2364 self.binding.retain_handle(h);
2365 self.commit_emission_verbatim(node_id, h);
2366 emitted += 1;
2367 } else {
2368 first_false_seen = true;
2369 break;
2370 }
2371 }
2372 if first_false_seen {
2373 self.complete(node_id);
2374 return;
2375 }
2376 if emitted == 0 {
2377 // Whole batch passed but was empty (impossible here since
2378 // inputs.is_empty() returned early above) — defensive only.
2379 self.settle_dirty_resolved(node_id);
2380 }
2381 }
2382
2383 /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2384 /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2385 /// default was registered) then `Complete` on upstream COMPLETE.
2386 /// On upstream ERROR, propagates verbatim. Storage:
2387 /// [`LastState`](super::op_state::LastState).
2388 ///
2389 /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2390 /// wave (`terminal == None`), `fire_op_last` updates the buffered
2391 /// `latest` handle but produces NO downstream wire message —
2392 /// subscribers observe the operator only when upstream
2393 /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2394 /// inputs from the dep's batch are dropped on the floor (their
2395 /// `data_batch` retains release at wave-end rotation
2396 /// independently). Per-wave settlement on intermediate waves is
2397 /// the canonical behavior for terminal-aware operators.
2398 fn fire_op_last(&self, node_id: NodeId) {
2399 use crate::op_state::LastState;
2400 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2401 {
2402 let mut s = self.lock_state();
2403 s.require_node_mut(node_id).has_fired_once = true;
2404 }
2405
2406 // Phase 2: buffer the latest input handle (if any). Retain new,
2407 // release old. data_batch slot's retain is released at wave-end
2408 // rotation independently — the LastState slot keeps its own
2409 // share so the value survives across waves.
2410 if let Some(&new_latest) = inputs.last() {
2411 let prev_latest = {
2412 let mut s = self.lock_state();
2413 let scratch = scratch_mut::<LastState>(&mut s, node_id);
2414 let prev = scratch.latest;
2415 scratch.latest = new_latest;
2416 prev
2417 };
2418 self.binding.retain_handle(new_latest);
2419 if prev_latest != crate::handle::NO_HANDLE {
2420 self.binding.release_handle(prev_latest);
2421 }
2422 }
2423
2424 // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2425 // produce no downstream message — Reduce-style silent
2426 // accumulation. The post-drain auto-resolve sweep is a no-op
2427 // because pending_notify has no entry for Last.
2428 match terminal {
2429 None => {}
2430 Some(TerminalKind::Complete) => {
2431 // Read the live latest + default. If latest != NO_HANDLE,
2432 // emit it. Otherwise, if default != NO_HANDLE, emit default.
2433 // Otherwise, emit only Complete (empty stream, no default).
2434 let (latest, default) = {
2435 let s = self.lock_state();
2436 let scratch = scratch_ref::<LastState>(&s, node_id);
2437 (scratch.latest, scratch.default)
2438 };
2439 let to_emit = if latest != crate::handle::NO_HANDLE {
2440 Some(latest)
2441 } else if default != crate::handle::NO_HANDLE {
2442 Some(default)
2443 } else {
2444 None
2445 };
2446 if let Some(h) = to_emit {
2447 // Emission needs its own retain — the LastState slot
2448 // keeps its share until reset/Drop.
2449 self.binding.retain_handle(h);
2450 self.commit_emission_verbatim(node_id, h);
2451 }
2452 self.complete(node_id);
2453 }
2454 Some(TerminalKind::Error(h)) => {
2455 // Take a fresh share for the error cascade — the
2456 // dep_records[0].terminal slot keeps its own share
2457 // (released by reset_for_fresh_lifecycle / Drop).
2458 self.binding.retain_handle(h);
2459 self.error(node_id, h);
2460 }
2461 }
2462 }
2463
2464 // -----------------------------------------------------------------
2465 // Slice U: control operators — fire_op impls
2466 // -----------------------------------------------------------------
2467
2468 /// Tap — side-effect passthrough. Invoke tap fn on each DATA, then
2469 /// emit each input handle unchanged (zero allocation).
2470 fn fire_op_tap(&self, node_id: NodeId, fn_id: FnId) {
2471 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2472 {
2473 let mut s = self.lock_state();
2474 s.require_node_mut(node_id).has_fired_once = true;
2475 }
2476 if inputs.is_empty() {
2477 if terminal.is_none() {
2478 self.settle_dirty_resolved(node_id);
2479 }
2480 } else {
2481 for &h in &inputs {
2482 self.binding.invoke_tap_fn(fn_id, h);
2483 self.binding.retain_handle(h);
2484 self.commit_emission_verbatim(node_id, h);
2485 }
2486 }
2487 // Terminal forwarding.
2488 match terminal {
2489 None => {}
2490 Some(TerminalKind::Complete) => {
2491 self.binding.invoke_tap_complete_fn(fn_id);
2492 self.complete(node_id);
2493 }
2494 Some(TerminalKind::Error(h)) => {
2495 self.binding.invoke_tap_error_fn(fn_id, h);
2496 self.binding.retain_handle(h);
2497 self.error(node_id, h);
2498 }
2499 }
2500 }
2501
2502 /// TapFirst — one-shot side-effect on first DATA. After the first
2503 /// qualifying DATA, acts as pure passthrough.
2504 fn fire_op_tap_first(&self, node_id: NodeId, fn_id: FnId) {
2505 use crate::op_state::TapFirstState;
2506 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2507 {
2508 let mut s = self.lock_state();
2509 s.require_node_mut(node_id).has_fired_once = true;
2510 }
2511 if inputs.is_empty() {
2512 if terminal.is_none() {
2513 self.settle_dirty_resolved(node_id);
2514 }
2515 } else {
2516 let fired = {
2517 let s = self.lock_state();
2518 scratch_ref::<TapFirstState>(&s, node_id).fired
2519 };
2520 for &h in &inputs {
2521 if !fired {
2522 self.binding.invoke_tap_fn(fn_id, h);
2523 let mut s = self.lock_state();
2524 scratch_mut::<TapFirstState>(&mut s, node_id).fired = true;
2525 }
2526 self.binding.retain_handle(h);
2527 self.commit_emission_verbatim(node_id, h);
2528 }
2529 }
2530 if let Some(TerminalKind::Complete) = terminal {
2531 self.complete(node_id);
2532 } else if let Some(TerminalKind::Error(h)) = terminal {
2533 self.binding.retain_handle(h);
2534 self.error(node_id, h);
2535 }
2536 }
2537
2538 /// Valve — conditional forward. dep[0]=source, dep[1]=control.
2539 /// When control is truthy, forwards source DATA; else RESOLVED.
2540 fn fire_op_valve(&self, node_id: NodeId) {
2541 // Snapshot both deps.
2542 let (src_inputs, src_terminal, ctrl_latest) = {
2543 let s = self.lock_state();
2544 let rec = s.require_node(node_id);
2545 debug_assert!(rec.dep_records.len() == 2, "valve must have exactly 2 deps");
2546 let dr0 = &rec.dep_records[0];
2547 let dr1 = &rec.dep_records[1];
2548 let src_inputs: Vec<HandleId> = dr0.data_batch.iter().copied().collect();
2549 let src_term = dr0.terminal;
2550 // Latest control: last of this wave's batch, or prev_data.
2551 let ctrl = dr1.data_batch.last().copied().unwrap_or(dr1.prev_data);
2552 (src_inputs, src_term, ctrl)
2553 };
2554 {
2555 let mut s = self.lock_state();
2556 s.require_node_mut(node_id).has_fired_once = true;
2557 }
2558
2559 // Source terminal forwarding (D3).
2560 if let Some(TerminalKind::Complete) = src_terminal {
2561 self.complete(node_id);
2562 return;
2563 }
2564 if let Some(TerminalKind::Error(h)) = src_terminal {
2565 self.binding.retain_handle(h);
2566 self.error(node_id, h);
2567 return;
2568 }
2569
2570 // Gate: NO_HANDLE means "gate closed" (control never sent DATA);
2571 // any real handle means "gate open". Proper value-level truthiness
2572 // would require BindingBoundary::is_truthy (deferred — D048).
2573 let gate_open = ctrl_latest != crate::handle::NO_HANDLE;
2574
2575 if !gate_open {
2576 self.settle_dirty_resolved(node_id);
2577 return;
2578 }
2579
2580 if src_inputs.is_empty() {
2581 // Control opened but no source DATA this wave. Re-emit
2582 // prev source value if available.
2583 let prev_src = {
2584 let s = self.lock_state();
2585 s.require_node(node_id).dep_records[0].prev_data
2586 };
2587 if prev_src == crate::handle::NO_HANDLE {
2588 self.settle_dirty_resolved(node_id);
2589 } else {
2590 self.binding.retain_handle(prev_src);
2591 self.commit_emission_verbatim(node_id, prev_src);
2592 }
2593 } else {
2594 for &h in &src_inputs {
2595 self.binding.retain_handle(h);
2596 self.commit_emission_verbatim(node_id, h);
2597 }
2598 }
2599 }
2600
2601 /// Settle — convergence detector. Forwards DATA, counts quiet waves,
2602 /// self-completes when converged.
2603 fn fire_op_settle(&self, node_id: NodeId, quiet_waves: u32, max_waves: Option<u32>) {
2604 use crate::op_state::SettleState;
2605 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2606 {
2607 let mut s = self.lock_state();
2608 s.require_node_mut(node_id).has_fired_once = true;
2609 }
2610
2611 // Terminal forwarding.
2612 if let Some(TerminalKind::Complete) = terminal {
2613 self.complete(node_id);
2614 return;
2615 }
2616 if let Some(TerminalKind::Error(h)) = terminal {
2617 self.binding.retain_handle(h);
2618 self.error(node_id, h);
2619 return;
2620 }
2621
2622 let saw_data = !inputs.is_empty();
2623
2624 // Forward all DATA.
2625 for &h in &inputs {
2626 self.binding.retain_handle(h);
2627 self.commit_emission_verbatim(node_id, h);
2628 }
2629
2630 // Update counters.
2631 let should_complete = {
2632 let mut s = self.lock_state();
2633 let scratch = scratch_mut::<SettleState>(&mut s, node_id);
2634 scratch.wave_count += 1;
2635 if saw_data {
2636 scratch.has_value = true;
2637 scratch.quiet_count = 0;
2638 } else {
2639 scratch.quiet_count += 1;
2640 }
2641 let settled = scratch.has_value && scratch.quiet_count >= quiet_waves;
2642 let exhausted = max_waves.is_some_and(|max| scratch.wave_count >= max);
2643 settled || exhausted
2644 };
2645
2646 if should_complete {
2647 self.complete(node_id);
2648 } else if !saw_data {
2649 self.settle_dirty_resolved(node_id);
2650 }
2651 }
2652
2653 pub(crate) fn deliver_data_to_consumer(
2654 &self,
2655 s: &mut CoreState,
2656 consumer_id: NodeId,
2657 dep_idx: usize,
2658 handle: HandleId,
2659 ) {
2660 // Retain the handle for the batch accumulation slot — each DATA
2661 // handle in `data_batch` owns a retain share, released at wave-end
2662 // rotation in `clear_wave_state`.
2663 self.binding.retain_handle(handle);
2664
2665 let is_dynamic;
2666 let is_state;
2667 let tracked_or_first_fire;
2668 // Slice F audit close (2026-05-07): default-mode pause suppression.
2669 // If the consumer is paused with `PausableMode::Default`, the
2670 // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2671 // pause-window dep deliveries into one fn execution on RESUME.
2672 // Mark `pending_wave` on the pause state instead of adding to
2673 // `pending_fires`. The dep state still advances (the data_batch push
2674 // above is unchanged), and clear_wave_state still rotates the latest
2675 // dep DATA into prev_data — so when the fn ultimately fires on
2676 // RESUME, it sees the consolidated post-pause state.
2677 let suppressed_for_default_pause;
2678 {
2679 let consumer = s.require_node_mut(consumer_id);
2680 consumer.dep_records[dep_idx].data_batch.push(handle);
2681 consumer.dep_records[dep_idx].involved_this_wave = true;
2682 consumer.involved_this_wave = true;
2683 // §10.13 perf (D047): set received_mask bit on first DATA
2684 // delivery for this dep.
2685 if dep_idx < 64 {
2686 consumer.received_mask |= 1u64 << dep_idx;
2687 // §10.3 perf (Slice V1): set involved_mask bit for
2688 // O(1) per-dep involvement query during fire.
2689 consumer.involved_mask |= 1u64 << dep_idx;
2690 }
2691 is_dynamic = consumer.is_dynamic;
2692 is_state = consumer.is_state();
2693 tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
2694 suppressed_for_default_pause = consumer.pause_state.is_paused()
2695 && consumer.pausable == crate::node::PausableMode::Default;
2696 if suppressed_for_default_pause {
2697 consumer.pause_state.mark_pending_wave();
2698 }
2699 }
2700 if suppressed_for_default_pause {
2701 // Default-mode pause: don't add to pending_fires; RESUME will
2702 // schedule one consolidated fire.
2703 return;
2704 }
2705 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
2706 // per-thread WaveState. State lock + WaveState borrow are
2707 // independent.
2708 if is_state {
2709 // State nodes don't have deps; unreachable in practice.
2710 } else if is_dynamic {
2711 if tracked_or_first_fire {
2712 with_wave_state(|ws| {
2713 ws.pending_fires.insert(consumer_id);
2714 });
2715 }
2716 } else {
2717 // Derived / Operator / Producer (Producer has no deps so won't
2718 // reach here, but the predicate-based dispatch handles it
2719 // uniformly).
2720 with_wave_state(|ws| {
2721 ws.pending_fires.insert(consumer_id);
2722 });
2723 }
2724 }
2725
2726 // -------------------------------------------------------------------
2727 // Subscriber notification
2728 // -------------------------------------------------------------------
2729
2730 /// Queue a wave-end message for `node_id`'s subscribers.
2731 ///
2732 /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
2733 /// 2026-05-08):** each push for a given node either appends the
2734 /// message to the open batch (if `NodeRecord::subscribers_revision`
2735 /// hasn't advanced since that batch opened — the common case — no
2736 /// extra allocation), or opens a fresh batch with a current sink
2737 /// snapshot frozen at the new revision. A sub installed mid-wave
2738 /// bumps `subscribers_revision`; the next `queue_notify` for the
2739 /// same node observes the bump and starts a new batch that includes
2740 /// the new sub. Pre-subscribe batches retain their original snapshot,
2741 /// so earlier emits flush to their original sink list — the new sub
2742 /// does NOT double-receive them via flush AND handshake replay,
2743 /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
2744 ///
2745 /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
2746 /// Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
2747 /// paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
2748 /// COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
2749 /// flush immediately. START (tier 0) is per-subscription and never
2750 /// transits queue_notify.
2751 pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
2752 // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
2753 // wave-content invariant assertion. The tier-3 slot at one node in
2754 // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
2755 // never multiple RESOLVED. Slice G moved equals substitution from
2756 // per-emit to wave-end coalescing; this assert pins that the
2757 // dispatcher itself never queues a violating combination at the
2758 // queue_notify granularity. Resolved arrivals come from:
2759 // 1. The auto-resolve sweep in `drain_and_flush` (gates on
2760 // `!any tier-3` so it can't add to a wave with Data).
2761 // 2. The wave-end equals-substitution pass (rewrites in place,
2762 // doesn't go through queue_notify).
2763 // Both honor R1.3.3.a by construction post-Slice-G.
2764 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2765 // on per-thread WaveState. The dev-mode invariant assertion
2766 // borrows WaveState briefly and drops before the rest of
2767 // queue_notify proceeds.
2768 #[cfg(debug_assertions)]
2769 if matches!(msg.tier(), 3) {
2770 with_wave_state(|ws| {
2771 if let Some(entry) = ws.pending_notify.get(&node_id) {
2772 // Walk all batches' messages — R1.3.3.a is a per-node
2773 // wave-content invariant, not per-batch (the X4 batches
2774 // are subscriber-snapshot epochs; the protocol-level
2775 // tier-3 invariant spans the whole wave for the node).
2776 let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
2777 let resolved_count = entry
2778 .iter_messages()
2779 .filter(|m| matches!(m, Message::Resolved))
2780 .count();
2781 let incoming_is_data = matches!(msg, Message::Data(_));
2782 if incoming_is_data {
2783 debug_assert!(
2784 resolved_count == 0,
2785 "R1.3.3.a violation at {node_id:?}: queueing Data into a \
2786 wave that already contains Resolved — Slice G should have \
2787 prevented this via wave-end coalescing"
2788 );
2789 } else {
2790 debug_assert!(
2791 !has_data,
2792 "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
2793 wave that already contains Data"
2794 );
2795 debug_assert!(
2796 resolved_count == 0,
2797 "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
2798 wave at one node"
2799 );
2800 }
2801 }
2802 });
2803 }
2804
2805 let buffered_tier = matches!(msg.tier(), 3 | 4);
2806 let cap = s.pause_buffer_cap;
2807
2808 // Pause-routing branch — handles its own retain/release and returns
2809 // before we touch `pending_notify`, so the rec borrow is contained.
2810 {
2811 let rec = s.require_node_mut(node_id);
2812 if rec.subscribers.is_empty() {
2813 return;
2814 }
2815 // Slice F audit close (2026-05-07): pause routing depends on mode.
2816 // - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
2817 // - `Default` + STATE node: state nodes have no fn-fire to
2818 // suppress, so buffer like resumeAll (collapse-to-latest is
2819 // a future enhancement; v1 keeps verbatim).
2820 // - `Default` + COMPUTE node: suppression happens upstream at
2821 // fn-fire scheduling (see `deliver_data_to_consumer`); no
2822 // outgoing tier-3 is produced from this node while paused,
2823 // so this branch is unreachable for compute-default-paused.
2824 // Fallthrough to the non-paused queue path is fine.
2825 // - `Off`: pause is ignored entirely — tier-3 flushes
2826 // immediately. Fallthrough.
2827 let mode_buffers_tier3 = match rec.pausable {
2828 crate::node::PausableMode::ResumeAll => true,
2829 crate::node::PausableMode::Default => rec.is_state(),
2830 crate::node::PausableMode::Off => false,
2831 };
2832 if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
2833 if let Some(h) = msg.payload_handle() {
2834 self.binding.retain_handle(h);
2835 }
2836 let push_result = rec.pause_state.push_buffered(msg, cap);
2837 for dm in push_result.dropped_msgs {
2838 if let Some(h) = dm.payload_handle() {
2839 self.binding.release_handle(h);
2840 }
2841 }
2842 // R1.3.8.c (Slice F, A3): on first overflow this cycle,
2843 // schedule a synthesized ERROR for wave-end emission.
2844 // `cap` is `Some` here (an overflow can only happen with a
2845 // configured cap), so `unwrap` is safe.
2846 if push_result.first_overflow_this_cycle {
2847 if let Some((dropped_count, lock_held_ns)) =
2848 rec.pause_state.overflow_diagnostic()
2849 {
2850 // Q-beyond Sub-slice 1 (D108, 2026-05-09):
2851 // pending_pause_overflow lives on per-thread WaveState.
2852 with_wave_state(|ws| {
2853 ws.pending_pause_overflow
2854 .push(crate::node::PendingPauseOverflow {
2855 node_id,
2856 dropped_count,
2857 configured_max: cap.unwrap_or(0),
2858 lock_held_ns,
2859 });
2860 });
2861 }
2862 }
2863 return;
2864 }
2865 }
2866
2867 // Non-paused queue path: retain payload handle and queue into
2868 // pending_notify. Released in `flush_notifications` after sinks
2869 // fire.
2870 if let Some(h) = msg.payload_handle() {
2871 self.binding.retain_handle(h);
2872 }
2873 Self::push_into_pending_notify(s, node_id, msg);
2874 }
2875
2876 /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
2877 /// non-paused path. Either appends `msg` to the open batch (if
2878 /// `subscribers_revision` hasn't advanced since it opened — common
2879 /// case, no extra allocation) or opens a fresh batch with a current
2880 /// sink snapshot frozen at the new revision.
2881 ///
2882 /// Borrow discipline: reads `subscribers_revision` and the snapshot
2883 /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
2884 /// keep the two scopes disjoint.
2885 ///
2886 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
2887 /// to per-thread WaveState. The state-side read of
2888 /// `subscribers_revision` / `subscribers` happens before the
2889 /// `with_wave_state` block opens, then the WaveState borrow
2890 /// performs the entry insertion / append. State lock + WaveState
2891 /// borrow remain independent.
2892 ///
2893 /// Lock-discipline assumption: this read of `subscribers_revision`
2894 /// is safe because both the subscribe install path
2895 /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2896 /// `CoreState`'s mutex when they bump / read the revision —
2897 /// concurrent subscribe/unsubscribe cannot interleave. **If
2898 /// `Core::subscribe` ever moves the sink-install lock-released
2899 /// (mirroring the lock-released drain refactor), the revision read
2900 /// here must re-validate post-borrow — otherwise a fresh batch
2901 /// could open with a stale snapshot.**
2902 fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2903 let current_rev = s.require_node(node_id).subscribers_revision;
2904 let needs_new_batch = with_wave_state(|ws| {
2905 ws.pending_notify.get(&node_id).is_none_or(|entry| {
2906 entry
2907 .batches
2908 .last()
2909 .is_none_or(|b| b.snapshot_revision != current_rev)
2910 })
2911 });
2912 let sinks_snapshot: SmallVec<[Sink; 1]> = if needs_new_batch {
2913 s.require_node(node_id)
2914 .subscribers
2915 .values()
2916 .cloned()
2917 .collect()
2918 } else {
2919 SmallVec::new()
2920 };
2921 with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
2922 Entry::Vacant(slot) => {
2923 let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2924 batches.push(PendingBatch {
2925 snapshot_revision: current_rev,
2926 sinks: sinks_snapshot,
2927 messages: smallvec::smallvec![msg],
2928 });
2929 slot.insert(PendingPerNode { batches });
2930 }
2931 Entry::Occupied(mut slot) => {
2932 let entry = slot.get_mut();
2933 if needs_new_batch {
2934 entry.batches.push(PendingBatch {
2935 snapshot_revision: current_rev,
2936 sinks: sinks_snapshot,
2937 messages: smallvec::smallvec![msg],
2938 });
2939 } else {
2940 entry
2941 .batches
2942 .last_mut()
2943 .expect("non-empty by construction (entry exists implies batch exists)")
2944 .messages
2945 .push(msg);
2946 }
2947 }
2948 });
2949 }
2950
2951 /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
2952 /// payload-handle releases owed for `pending_notify` into
2953 /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
2954 /// run **after** the state lock is dropped — see [`Core::run_wave`].
2955 ///
2956 /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2957 /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2958 /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2959 /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2960 /// queue lock-released, multi-node subscribers see all DIRTYs before any
2961 /// settle. Matches TS's drainPhase model without the per-tier queue
2962 /// indirection.
2963 ///
2964 /// Phase ordering:
2965 /// 1 → tier 1 (DIRTY)
2966 /// 2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2967 /// 3 → tier 5 (COMPLETE/ERROR)
2968 /// 4 → tier 6 (TEARDOWN)
2969 ///
2970 /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2971 /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2972 /// bypassing pending_notify; both are absent from this enumeration.
2973 ///
2974 /// Within a single phase, per-node insertion order (IndexMap iteration)
2975 /// is preserved — an emit on A before B → A's phase-2 messages flush
2976 /// before B's. Within a single node, message order is preserved.
2977 fn flush_notifications(&self, s: &mut CoreState) {
2978 const PHASES: &[&[u8]] = &[
2979 &[1], // DIRTY
2980 &[3, 4], // DATA/RESOLVED + INVALIDATE
2981 &[5], // COMPLETE/ERROR
2982 &[6], // TEARDOWN
2983 ];
2984 // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
2985 // deferred_handle_releases, and deferred_flush_jobs all live on
2986 // per-thread WaveState. Take pending_notify under the WaveState
2987 // borrow, drop the borrow, run the per-phase loop (no WaveState
2988 // access in the loop body), then re-borrow WaveState at the end
2989 // to push the collected jobs and payload-handle releases.
2990 //
2991 // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
2992 // currently unused inside the per-phase loop — `pending` was
2993 // moved off `s` to WaveState by sub-slice 2, and the per-batch
2994 // sink snapshot is already on the PendingBatch. Kept as a
2995 // parameter to preserve the caller's `let mut s = lock_state();
2996 // self.flush_notifications(&mut s);` invocation shape (caller
2997 // holds the state lock around this call — load-bearing for
2998 // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
2999 // released" marker; the lock guard belongs to the caller and
3000 // is held throughout this function. A future change that adds
3001 // an in-loop state read should remove the discard below;
3002 // removing the parameter would break the caller's ability to
3003 // express the lock-discipline contract at the call site.
3004 let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
3005 let pending = with_wave_state(|ws| std::mem::take(&mut ws.pending_notify));
3006 let mut jobs: DeferredJobs = Vec::new();
3007 for &phase_tiers in PHASES {
3008 for (_node_id, entry) in &pending {
3009 // Slice X4 / D2: iterate batches in arrival order. Each
3010 // batch carries its own sink snapshot frozen at open-time;
3011 // a batch's messages flush to ITS sinks only. Within a
3012 // single (phase, node), batches stay in arrival order so
3013 // emit-order semantics are preserved across batches.
3014 for batch in &entry.batches {
3015 if batch.sinks.is_empty() {
3016 continue;
3017 }
3018 let phase_msgs: Vec<Message> = batch
3019 .messages
3020 .iter()
3021 .copied()
3022 .filter(|m| phase_tiers.contains(&m.tier()))
3023 .collect();
3024 if phase_msgs.is_empty() {
3025 continue;
3026 }
3027 let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
3028 jobs.push((sinks_clone, phase_msgs));
3029 }
3030 }
3031 }
3032 // Single WaveState borrow at the end: push the collected jobs
3033 // and the payload-handle releases. Refcount release balances the
3034 // retain done in `queue_notify` for every payload-bearing message
3035 // that landed in pending_notify (across ALL batches per node);
3036 // deferred to post-lock-drop so the binding's release path can't
3037 // re-enter Core under our lock.
3038 with_wave_state(|ws| {
3039 ws.deferred_flush_jobs.append(&mut jobs);
3040 for entry in pending.values() {
3041 for msg in entry.iter_messages() {
3042 if let Some(h) = msg.payload_handle() {
3043 ws.deferred_handle_releases.push(h);
3044 }
3045 }
3046 }
3047 });
3048 }
3049
3050 /// Take the deferred sink-fire jobs, payload-handle releases,
3051 /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
3052 /// Callers pair this with `drop(state_guard)` and a subsequent
3053 /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
3054 /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
3055 /// Q2(b) eager wipe_ctx fires lock-released.
3056 ///
3057 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
3058 /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
3059 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
3060 /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
3061 /// WaveState. The `_s: &mut CoreState` parameter is now unused but
3062 /// kept to preserve the call-site lock-discipline ordering (caller
3063 /// holds the state lock around this call to interleave with prior
3064 /// `clear_wave_state` per-NodeRecord work).
3065 pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
3066 (
3067 std::mem::take(&mut ws.deferred_flush_jobs),
3068 std::mem::take(&mut ws.deferred_handle_releases),
3069 std::mem::take(&mut ws.deferred_cleanup_hooks),
3070 std::mem::take(&mut ws.pending_wipes),
3071 )
3072 }
3073
3074 /// Fire deferred sink-fire jobs in collected order, then release the
3075 /// payload handles owed for messages that landed in `pending_notify`
3076 /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
3077 /// hooks. All three phases run lock-released so:
3078 /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
3079 /// state lock cleanly and run their own nested wave.
3080 /// - The binding's `release_handle` path can't deadlock against a
3081 /// binding-side mutex held by Core.
3082 /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
3083 /// may safely re-enter Core for unrelated nodes.
3084 ///
3085 /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
3086 /// is wrapped in `catch_unwind` so a single binding panic doesn't
3087 /// short-circuit the per-wave drain. All queued cleanup attempts run;
3088 /// if any panicked, the LAST panic re-raises after the loop completes
3089 /// (preserving wave-end discipline while still surfacing failures).
3090 /// Per D060, Core stays panic-naive about user code — bindings own
3091 /// their host-language panic policy inside `cleanup_for`; this
3092 /// `catch_unwind` is purely about drain-don't-short-circuit.
3093 pub(crate) fn fire_deferred(
3094 &self,
3095 jobs: DeferredJobs,
3096 releases: Vec<HandleId>,
3097 cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
3098 pending_wipes: Vec<crate::handle::NodeId>,
3099 ) {
3100 // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
3101 // `catch_unwind` so a panicking sink doesn't unwind out of
3102 // `fire_deferred` and drop the queued `releases` +
3103 // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
3104 // handshake-fire discipline. Without this guard, a sink panic
3105 // here would silently leak handle retains AND silently drop
3106 // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
3107 // we re-raise the last panic at the end after running every
3108 // queued fire — drain ordering is preserved.
3109 let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
3110 for (sinks, msgs) in jobs {
3111 for sink in &sinks {
3112 let sink = sink.clone();
3113 let msgs_ref = &msgs;
3114 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3115 sink(msgs_ref);
3116 }));
3117 if let Err(payload) = result {
3118 last_panic = Some(payload);
3119 }
3120 }
3121 }
3122 for h in releases {
3123 self.binding.release_handle(h);
3124 }
3125 // Slice E2 (D060): drain cleanup hooks with per-item panic
3126 // isolation so the loop always completes. AssertUnwindSafe is
3127 // safe here because we don't rely on logical state being valid
3128 // post-panic — the panic propagates anyway after the drain ends.
3129 for (node_id, trigger) in cleanup_hooks {
3130 let binding = &self.binding;
3131 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3132 binding.cleanup_for(node_id, trigger);
3133 }));
3134 if let Err(payload) = result {
3135 last_panic = Some(payload);
3136 }
3137 }
3138 // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
3139 // same per-item panic isolation. Fires AFTER cleanup hooks so a
3140 // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
3141 // fires in the same wave) sees pre-wipe binding state if it
3142 // landed in the same wave as the terminal cascade. Mutually
3143 // exclusive with `Subscription::Drop`'s direct-fire site, but
3144 // even concurrent fires are idempotent (binding's `wipe_ctx`
3145 // calls `HashMap::remove` which is a no-op on absent keys).
3146 for node_id in pending_wipes {
3147 let binding = &self.binding;
3148 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3149 binding.wipe_ctx(node_id);
3150 }));
3151 if let Err(payload) = result {
3152 last_panic = Some(payload);
3153 }
3154 }
3155 if let Some(payload) = last_panic {
3156 std::panic::resume_unwind(payload);
3157 }
3158 }
3159
3160 // -------------------------------------------------------------------
3161 // User-facing batch — coalesce multiple emits into one wave
3162 // -------------------------------------------------------------------
3163
3164 /// Coalesce multiple emissions into a single wave. Every `emit` /
3165 /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
3166 /// queues its downstream work; the wave drains when `f` returns.
3167 ///
3168 /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
3169 /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
3170 /// — repeated emits on the same node coalesce into a single multi-message
3171 /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
3172 /// per emit, all delivered together in the per-node phase-2 pass).
3173 ///
3174 /// Nested `batch()` calls share the outer wave; only the outermost call
3175 /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3176 /// engine's own `in_tick` re-entrance) compose with this method
3177 /// transparently — they observe `in_tick = true` and skip drain just
3178 /// like nested `batch()`.
3179 ///
3180 /// On panic inside `f`, the `BatchGuard` returned by the internal
3181 /// `begin_batch` call drops normally and discards pending tier-3+ work
3182 /// (subscribers do not observe the half-built wave). See
3183 /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3184 /// over the scope boundary.
3185 pub fn batch<F>(&self, f: F)
3186 where
3187 F: FnOnce(),
3188 {
3189 let _guard = self.begin_batch();
3190 f();
3191 }
3192
3193 /// RAII batch handle — opens a wave when constructed, drains on drop.
3194 ///
3195 /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3196 /// boundary so callers can compose batches with non-`FnOnce` control
3197 /// flow (e.g. async-state-machine code paths, or splitting setup and
3198 /// drain across helper functions).
3199 ///
3200 /// ```
3201 /// use graphrefly_core::{Core, BindingBoundary, NodeRegistration, NodeOpts,
3202 /// HandleId, NodeId, FnId, FnResult, DepBatch};
3203 /// use std::sync::Arc;
3204 ///
3205 /// struct Stub;
3206 /// impl BindingBoundary for Stub {
3207 /// fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3208 /// FnResult::Noop { tracked: None }
3209 /// }
3210 /// fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3211 /// fn release_handle(&self, _: HandleId) {}
3212 /// }
3213 ///
3214 /// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3215 /// let state_a = core.register(NodeRegistration {
3216 /// deps: vec![], fn_or_op: None,
3217 /// opts: NodeOpts { initial: HandleId::new(1), ..Default::default() },
3218 /// }).unwrap();
3219 /// let state_b = core.register(NodeRegistration {
3220 /// deps: vec![], fn_or_op: None,
3221 /// opts: NodeOpts { initial: HandleId::new(2), ..Default::default() },
3222 /// }).unwrap();
3223 ///
3224 /// let g = core.begin_batch();
3225 /// core.emit(state_a, HandleId::new(10));
3226 /// core.emit(state_b, HandleId::new(20));
3227 /// drop(g); // wave drains here
3228 /// ```
3229 ///
3230 /// Like the closure form, nested `begin_batch` calls share the outer
3231 /// wave (only the outermost guard drains).
3232 ///
3233 /// # Panics
3234 ///
3235 /// Panics if the registry-epoch retry-validate loop exceeds
3236 /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations — pathological
3237 /// concurrent `register` / `set_deps` activity racing with
3238 /// closure-form batch entry. Unreachable in correct call paths.
3239 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3240 pub fn begin_batch(&self) -> BatchGuard {
3241 // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
3242 // seed; per session-doc Q7 / D092 it MUST serialize against every
3243 // currently-existing partition. Acquire each partition's
3244 // `wave_owner` in ascending [`SubgraphId`] order via the retry-
3245 // validate primitive. Same-thread re-entry passes through each
3246 // ReentrantMutex transparently; cross-thread waves on any of the
3247 // touched partitions block until our `wave_guards` drop.
3248 //
3249 // **QA-fix #2 (2026-05-09) — registry epoch retry-validate:** a
3250 // concurrent `register` / `set_deps`-driven union/split between
3251 // our `all_partitions_lock_boxes()` snapshot and the post-
3252 // acquire epoch read changes the partition set. We then retry
3253 // the whole acquire with the new snapshot. Without this, a
3254 // partition added after our snapshot would not be held by our
3255 // batch — breaking the closure-form's "all-partitions
3256 // serialization" contract.
3257 //
3258 // Trade-off (documented v1 contract): closure-form batch is the
3259 // serialization point under per-partition parallelism. Per-seed
3260 // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
3261 // acquire only the touched partitions and run truly parallel
3262 // for disjoint partitions.
3263 for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3264 let epoch_before = self.registry.lock().epoch();
3265 let partition_boxes = self.all_partitions_lock_boxes();
3266 let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3267 for (sid, _box) in &partition_boxes {
3268 // Use the partition's root NodeId as the lock_for retry
3269 // seed. SubgraphId.raw() == root NodeId.raw(); the root
3270 // is always registered in the X5 / Phase-E substrate
3271 // (cleanup_node is gated, Phase G activates).
3272 let representative = crate::handle::NodeId::new(sid.raw());
3273 wave_guards.push(
3274 self.partition_wave_owner_lock_arc(representative)
3275 .unwrap_or_else(|e| panic!("{e}")),
3276 );
3277 }
3278 // Post-acquire epoch read. If unchanged, our snapshot is
3279 // still authoritative — every existing partition was held
3280 // throughout. If changed, drop guards and retry.
3281 let epoch_after = self.registry.lock().epoch();
3282 if epoch_after == epoch_before {
3283 return self.begin_batch_with_guards(wave_guards);
3284 }
3285 // Drop guards lock-released so retries don't accumulate.
3286 drop(wave_guards);
3287 std::thread::yield_now();
3288 }
3289 panic!(
3290 "Core::begin_batch: exceeded {} retries — pathological concurrent \
3291 register/union/split activity racing with closure-form batch entry",
3292 crate::subgraph::MAX_LOCK_RETRIES
3293 );
3294 }
3295
3296 /// Begin a batch scoped to the partitions transitively touched from
3297 /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
3298 /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
3299 /// reachable partition, and acquires each in ascending
3300 /// [`crate::subgraph::SubgraphId`] order via
3301 /// [`Core::partition_wave_owner_lock_arc`].
3302 ///
3303 /// Two threads with disjoint touched-partition sets run truly
3304 /// parallel — the per-partition `wave_owner` mutexes don't block
3305 /// each other. This is the canonical Y1 parallelism win for
3306 /// per-seed wave-driving entry points (subscribe, emit, pause,
3307 /// resume, invalidate, complete, error, teardown,
3308 /// set_deps push-on-subscribe).
3309 ///
3310 /// **QA-fix #2 (2026-05-09):** retry-validate the touched-partition
3311 /// set against the registry epoch — same protection as
3312 /// [`Self::begin_batch`] but scoped to a per-seed touched set
3313 /// rather than every partition. Conservative: any registry
3314 /// mutation (even on a partition unrelated to seed's touched set)
3315 /// triggers a retry. This avoids a precise "did MY touched set
3316 /// change?" check at the cost of occasional spurious retries.
3317 ///
3318 /// # Panics
3319 ///
3320 /// Panics if the registry-epoch retry-validate loop exceeds
3321 /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations, OR if
3322 /// [`Core::partition_wave_owner_lock_arc`] panics on an
3323 /// unregistered seed. Both are unreachable in correct call paths
3324 /// (P12 invariant guarantees registry membership matches
3325 /// `s.nodes`).
3326 ///
3327 /// Slice Y1 / Phase E (2026-05-08); QA-fix #2 (2026-05-09).
3328 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3329 pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
3330 match self.try_begin_batch_for(seed) {
3331 Ok(guard) => guard,
3332 Err(e) => panic!("{e}"),
3333 }
3334 }
3335
3336 /// Fallible variant of `begin_batch_for`. Returns `Err` if any
3337 /// partition acquire violates ascending order (Phase H+ STRICT,
3338 /// D115). Used by `try_run_wave_for`; the public `begin_batch_for`
3339 /// calls this and unwraps.
3340 pub(crate) fn try_begin_batch_for(
3341 &self,
3342 seed: crate::handle::NodeId,
3343 ) -> Result<BatchGuard, crate::node::PartitionOrderViolation> {
3344 let core_generation = self.generation;
3345 for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3346 let epoch_before = self.registry.lock().epoch();
3347 // Fast-path: per-thread partition cache. On repeated emits to
3348 // the same seed (the dominant hot-loop pattern), skip the BFS
3349 // in compute_touched_partitions — it acquires state + registry
3350 // locks and allocates a HashSet + SmallVec per call. The cache
3351 // is valid as long as the registry epoch hasn't changed (no
3352 // register/union/split since the cache was populated).
3353 let touched = PARTITION_CACHE
3354 .with(|cell| {
3355 let cache = cell.borrow();
3356 if let Some(ref c) = *cache {
3357 if c.core_generation == core_generation
3358 && c.seed == seed
3359 && c.epoch == epoch_before
3360 {
3361 return Some(c.partitions.clone());
3362 }
3363 }
3364 None
3365 })
3366 .unwrap_or_else(|| {
3367 let result = self.compute_touched_partitions(seed);
3368 PARTITION_CACHE.with(|cell| {
3369 *cell.borrow_mut() = Some(PartitionCache {
3370 core_generation,
3371 seed,
3372 epoch: epoch_before,
3373 partitions: result.clone(),
3374 });
3375 });
3376 result
3377 });
3378 let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3379 let mut partition_err = None;
3380 for sid in &touched {
3381 let representative = crate::handle::NodeId::new(sid.raw());
3382 match self.partition_wave_owner_lock_arc(representative) {
3383 Ok(guard) => wave_guards.push(guard),
3384 Err(e) => {
3385 partition_err = Some(e);
3386 break;
3387 }
3388 }
3389 }
3390 // Drop wave_guards on error — release any already-acquired partitions.
3391 if let Some(e) = partition_err {
3392 drop(wave_guards);
3393 return Err(e);
3394 }
3395 let epoch_after = self.registry.lock().epoch();
3396 if epoch_after == epoch_before {
3397 return Ok(self.begin_batch_with_guards(wave_guards));
3398 }
3399 // Epoch changed — invalidate cache and retry.
3400 PARTITION_CACHE.with(|cell| {
3401 *cell.borrow_mut() = None;
3402 });
3403 drop(wave_guards);
3404 std::thread::yield_now();
3405 }
3406 panic!(
3407 "Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
3408 pathological concurrent register/union/split activity racing \
3409 with per-seed batch entry",
3410 crate::subgraph::MAX_LOCK_RETRIES
3411 );
3412 }
3413
3414 /// Is this thread currently inside an owning wave on this Core?
3415 /// Per-(Core, thread) — see [`IN_TICK_OWNED`]. Read on the wave-owner
3416 /// thread (e.g. by `commit_emission` to decide cache-snapshot taking).
3417 /// `#[must_use]`: a discarded result silently loses the
3418 /// ownership/nesting decision (a classic predicate-misuse bug).
3419 #[must_use]
3420 fn in_tick(&self) -> bool {
3421 IN_TICK_OWNED.with(|s| s.borrow().contains(&self.generation))
3422 }
3423
3424 /// Claim wave ownership for this (Core, thread). Returns `true` iff
3425 /// this call is the outermost entry (slot was absent) — i.e.
3426 /// `owns_tick`; `false` for nested same-(Core, thread) re-entry.
3427 /// `AHashSet::insert` returns `true` exactly when the value was newly
3428 /// inserted, which is precisely the `owns_tick` semantics.
3429 fn claim_in_tick(&self) -> bool {
3430 IN_TICK_OWNED.with(|s| s.borrow_mut().insert(self.generation))
3431 }
3432
3433 /// Release wave ownership for this (Core, thread). Called by the
3434 /// owning [`BatchGuard::drop`] only — after the `!owns_tick`
3435 /// early-return, so a nested guard never releases — explicitly at
3436 /// each of the three exit points, always AFTER the wave drain +
3437 /// WaveState cleanup and BEFORE `fire_deferred` (so a re-entrant sink
3438 /// emit runs as a fresh owning wave): (1) the closure-body-panic
3439 /// branch, (2) the drain-phase-panic `catch_unwind` arm (before
3440 /// `resume_unwind`), (3) the success path's locked cleanup block.
3441 /// Released exactly once per (Core, thread, wave); idempotent
3442 /// regardless (`AHashSet::remove` of an absent key is a no-op).
3443 fn clear_in_tick(&self) {
3444 IN_TICK_OWNED.with(|s| {
3445 s.borrow_mut().remove(&self.generation);
3446 });
3447 }
3448
3449 /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
3450 /// with the supplied (already-acquired) partition wave-owner guards.
3451 /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
3452 /// order (the canonical lock-acquisition order) — both
3453 /// [`Self::begin_batch`] (all-partitions) and
3454 /// [`Self::begin_batch_for`] (touched-partitions) construct the
3455 /// vector in that order before calling here.
3456 fn begin_batch_with_guards(
3457 &self,
3458 wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3459 ) -> BatchGuard {
3460 // Claim wave ownership for this (Core, thread). Keyed per-(Core,
3461 // thread) in the `IN_TICK_OWNED` thread_local (see its doc for
3462 // the cross-Core / disjoint-partition / nested-re-entry rationale)
3463 // — no state lock needed, since `in_tick` has no cross-thread read
3464 // requirement.
3465 let owns_tick = self.claim_in_tick();
3466 // D1 patch (2026-05-09): defensive wave-start clear of the
3467 // per-thread Slice G tier3 tracker on outermost owning entry.
3468 // The thread-local is cleared at outermost BatchGuard drop on
3469 // both success + panic paths; this start-clear is belt-and-
3470 // suspenders against panic paths that bypass Drop (catch_unwind
3471 // can interleave with thread reuse — e.g. cargo's test-runner
3472 // thread pool — and propagate stale entries from a prior
3473 // panicked test's wave that didn't fully unwind through
3474 // BatchGuard::drop).
3475 if owns_tick {
3476 tier3_clear();
3477 // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3478 // clear of WaveState's non-retain-holding fields. Mirrors the
3479 // tier3 defensive-clear above. Retain-holding fields
3480 // (wave_cache_snapshots / deferred_handle_releases) MUST be
3481 // empty here — outermost BatchGuard::drop drains them on both
3482 // success + panic paths.
3483 wave_state_clear_outermost();
3484 }
3485 BatchGuard {
3486 core: self.clone(),
3487 owns_tick,
3488 wave_guards,
3489 _not_send: std::marker::PhantomData,
3490 }
3491 }
3492}
3493
3494/// RAII guard returned by [`Core::begin_batch`].
3495///
3496/// While alive, suppresses per-emit wave drains — multiple `emit` /
3497/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3498/// wave. On drop:
3499/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3500/// in-tick).
3501/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3502/// the in-tick flag): silently no-ops.
3503///
3504/// On thread panic during the closure body, the drop path discards pending
3505/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3506/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3507/// State-node cache writes that already executed inside the closure are
3508/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3509/// panic value. The atomicity guarantee covers both sink-observability and
3510/// cache state.
3511///
3512/// # Thread safety
3513///
3514/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3515/// per-(Core, thread) `in_tick` ownership slot AND the per-partition
3516/// `wave_owner` re-entrant mutex(es) on the calling thread; sending the
3517/// guard to another thread and dropping it there would clear `in_tick`
3518/// against the wrong thread's slot and release the wave-owner guards
3519/// from a different thread than the one that acquired them, breaking
3520/// both the per-(Core, thread) "I own the wave scope" semantic and
3521/// `parking_lot::ReentrantMutex`'s ownership invariant. The `wave_guards` field is a `SmallVec` of
3522/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
3523/// marker is belt-and-suspenders.
3524///
3525/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
3526/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
3527/// `SmallVec` of partition wave-owner guards. Closure-form
3528/// `begin_batch` acquires every current partition (serialization
3529/// point); `begin_batch_for(seed)` acquires only the transitively-
3530/// touched partitions (parallel for disjoint sets).
3531///
3532/// ```compile_fail
3533/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3534/// use std::sync::Arc;
3535///
3536/// struct Stub;
3537/// impl BindingBoundary for Stub {
3538/// fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3539/// FnResult::Noop { tracked: None }
3540/// }
3541/// fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3542/// fn release_handle(&self, _: HandleId) {}
3543/// }
3544/// fn requires_send<T: Send>(_: T) {}
3545/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3546/// let guard = core.begin_batch();
3547/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3548/// ```
3549#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3550pub struct BatchGuard {
3551 core: Core,
3552 owns_tick: bool,
3553 /// Re-entrant mutex guards held for the wave's duration. One entry
3554 /// per touched partition's `wave_owner`, in ascending
3555 /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
3556 /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
3557 /// are held by the same thread). Cross-thread waves on any of the
3558 /// held partitions block until our scope ends; cross-thread waves
3559 /// on partitions NOT in this vector run truly parallel — the
3560 /// canonical Y1 parallelism property.
3561 ///
3562 /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
3563 /// (and thus `BatchGuard`) is `!Send` at the type level — sending
3564 /// across threads would violate `parking_lot::ReentrantMutex`'s
3565 /// thread-ownership invariant.
3566 wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3567 _not_send: std::marker::PhantomData<*const ()>,
3568}
3569
3570impl BatchGuard {
3571 /// Panic-discard cleanup for the owning guard: drop pending wave
3572 /// work, release queued payload + handle retains lock-released,
3573 /// restore pre-wave cache snapshots, clear per-thread `WaveState` +
3574 /// the Slice-G tier3 tracker, and discard deferred producer ops.
3575 ///
3576 /// Shared by BOTH panic origins so a drain-phase fn/sink panic gets
3577 /// the identical `BatchGuard` atomicity guarantee as a closure-body
3578 /// panic: (1) the `std::thread::panicking()` branch (panic propagated
3579 /// from the wave's *closure body* — drop runs during that unwind),
3580 /// and (2) the success-path `catch_unwind` around `drain_and_flush()`
3581 /// (a fn/sink panic that escaped the inner per-call `catch_unwind`
3582 /// isolation while drop was NOT already unwinding). /qa D047.
3583 ///
3584 /// Does NOT release `in_tick` — each `BatchGuard::drop` exit path
3585 /// calls `clear_in_tick()` explicitly, after this cleanup and before
3586 /// `fire_deferred` (so a re-entrant sink emit runs as a fresh owning
3587 /// wave).
3588 fn discard_wave_cleanup(&self) {
3589 let (pending, deferred_releases, restored_releases) = {
3590 let mut s = self.core.lock_state();
3591 // WaveState borrowed alongside state for panic-discard
3592 // cleanup. The WaveState borrow is per-thread, independent of
3593 // state. `in_tick` is per-(Core, thread) (`IN_TICK_OWNED`),
3594 // released separately by the explicit `clear_in_tick` on each
3595 // exit path; this method only drains/cleans the per-thread
3596 // WaveState retain-fields.
3597 with_wave_state(|ws| {
3598 let pending = std::mem::take(&mut ws.pending_notify);
3599 let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3600 ws.pending_fires.clear();
3601 let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3602 // clear_wave_state pushes batch-handle releases into
3603 // ws.deferred_handle_releases, so take ws's queue AFTER
3604 // the clear.
3605 s.clear_wave_state(ws);
3606 ws.clear_wave_state();
3607 let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
3608 // Slice E2 (D061): panic-discard wave drops queued
3609 // OnInvalidate cleanup hooks SILENTLY. Bindings using
3610 // OnInvalidate for external-resource cleanup MUST
3611 // idempotent-cleanup at process exit / next successful
3612 // invalidate. Mirrors A3 `pending_pause_overflow`
3613 // panic-discard precedent.
3614 let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
3615 std::mem::take(&mut ws.deferred_cleanup_hooks);
3616 // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
3617 // for the eager-wipe queue. A panic-discarded wave drops
3618 // queued `wipe_ctx` fires silently; the binding-side
3619 // `NodeCtxState` entry remains until the next successful
3620 // terminate-with-no-subs cycle (or until `Core` drops).
3621 // This mirrors D061's external-resource-cleanup gap and
3622 // is documented similarly.
3623 let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
3624 (pending, deferred_releases, restored)
3625 })
3626 };
3627 // Lock dropped — release retains lock-released so the binding
3628 // can't deadlock against an internal binding mutex.
3629 for entry in pending.values() {
3630 for msg in entry.iter_messages() {
3631 if let Some(h) = msg.payload_handle() {
3632 self.core.binding.release_handle(h);
3633 }
3634 }
3635 }
3636 for h in deferred_releases {
3637 self.core.binding.release_handle(h);
3638 }
3639 for h in restored_releases {
3640 self.core.binding.release_handle(h);
3641 }
3642 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3643 // tracker on outermost wave-end (panic-discard path). The
3644 // thread-local outlives the BatchGuard otherwise — cargo's
3645 // thread reuse across tests would propagate stale entries.
3646 tier3_clear();
3647 // Phase H+ STRICT (D115): discard deferred producer ops on
3648 // panic. Release handle retains without firing.
3649 {
3650 let mut ops = self.core.deferred_producer_ops.lock();
3651 let discarded = std::mem::take(&mut *ops);
3652 for op in discarded {
3653 match op {
3654 crate::node::DeferredProducerOp::Emit { handle, .. }
3655 | crate::node::DeferredProducerOp::Error { handle, .. } => {
3656 self.core.binding.release_handle(handle);
3657 }
3658 _ => {} // Complete has no handle; Callback drops naturally
3659 }
3660 }
3661 }
3662 }
3663}
3664
3665impl Drop for BatchGuard {
3666 fn drop(&mut self) {
3667 if !self.owns_tick {
3668 // Nested / non-owning guard: never claimed ownership, so it
3669 // must never release it. The owning guard's RAII releaser
3670 // (below) is the single clear site.
3671 return;
3672 }
3673 // Wave-ownership (`in_tick`) release discipline. `clear_in_tick`
3674 // must run AFTER the wave's drain + WaveState cleanup but BEFORE
3675 // `fire_deferred` (sinks), on every exit path:
3676 //
3677 // - **Before `fire_deferred` (load-bearing):** a sink re-entering
3678 // `Core::emit` / `complete` from a flush callback must run as a
3679 // fresh OWNING wave (so its own emissions drain + deliver). If
3680 // `in_tick` were still owned during `fire_deferred`, that
3681 // re-entrant emit would be a non-owning no-op and its data
3682 // silently lost (regression caught by
3683 // `lock_discipline::sink_can_reenter_core_via_emit`). This is
3684 // why each path clears explicitly at the right point — NOT via
3685 // an end-of-`drop` RAII guard (which would clear *after*
3686 // `fire_deferred`).
3687 // - **/qa hardening (D047):** a fn/sink panic in the drain phase
3688 // can escape the per-call `catch_unwind` isolation (e.g. a
3689 // derived fn panicking when fired). Drop is NOT already
3690 // unwinding, so it would otherwise skip BOTH the WaveState
3691 // drain (→ next owning wave trips `wave_state_clear_outermost`)
3692 // AND the `in_tick` clear (pre-D047 the explicit clear had this
3693 // same window). Catching the drain panic, running the shared
3694 // discard cleanup + `clear_in_tick`, then `resume_unwind` gives
3695 // a drain-phase panic the identical atomicity as a
3696 // closure-body panic.
3697 if std::thread::panicking() {
3698 // Closure-body panic — drop runs during that unwind. Discard
3699 // pending wave work (don't fire sinks mid-unwind — a sink
3700 // panic then aborts the process), release queued retains,
3701 // restore caches, then release ownership.
3702 self.discard_wave_cleanup();
3703 self.core.clear_in_tick();
3704 return;
3705 }
3706 if let Err(payload) =
3707 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.core.drain_and_flush()))
3708 {
3709 self.discard_wave_cleanup();
3710 self.core.clear_in_tick();
3711 std::panic::resume_unwind(payload);
3712 }
3713 // Wave cleanup + extract deferred jobs under the lock.
3714 let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
3715 let mut s = self.core.lock_state();
3716 // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
3717 // borrowed alongside state for wave-end cleanup. Per-thread;
3718 // independent of state. Sub-slice 3 moved deferred_* drains
3719 // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
3720 // currently_firing back to CoreState — clear via
3721 // CoreState::clear_wave_state under the held state lock.
3722 let result = with_wave_state(|ws| {
3723 s.clear_wave_state(ws);
3724 ws.clear_wave_state();
3725 // /qa A1 (2026-05-09) discipline preserved: drain snapshot
3726 // retains under lock, release lock-released below to avoid
3727 // binding re-entrance under held mutex / borrow.
3728 let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
3729 // `drain_deferred` takes `deferred_flush_jobs` +
3730 // `deferred_handle_releases` (incl. rotation releases pushed
3731 // by `clear_wave_state` above) + Slice E2
3732 // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
3733 // `pending_wipes` — all from WaveState post-Sub-slice-3.
3734 let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
3735 (jobs, releases, hooks, wipes, snapshot_releases)
3736 });
3737 // Release wave ownership now — AFTER drain + WaveState
3738 // cleanup, BEFORE `fire_deferred` below. Load-bearing: a sink
3739 // re-entering Core from a flush callback must observe
3740 // `in_tick` clear so its emit runs as a fresh owning wave.
3741 // (Mirrors the placement of the pre-D047 `s.in_tick = false`;
3742 // the drain-phase-panic window that placement had is closed
3743 // by the `catch_unwind` above.)
3744 self.core.clear_in_tick();
3745 result
3746 };
3747 // Lock dropped — fire deferred sinks + release retains + fire
3748 // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
3749 // + fire eager wipes (D069).
3750 self.core
3751 .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
3752 // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
3753 // lock-released. Pre-A1 these were released inside the held
3754 // state + cross_partition locks; binding finalizers re-entering
3755 // Core would deadlock against either mutex. Drained earlier
3756 // under the lock; released here after both mutexes dropped and
3757 // sinks have fired.
3758 for h in snapshot_releases {
3759 self.core.binding.release_handle(h);
3760 }
3761 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3762 // tracker at outermost wave-end (success path). Mirrors the
3763 // panic-discard branch above. Thread-local outlives BatchGuard
3764 // by default; cargo's thread-reuse across tests would propagate
3765 // stale entries. Cleared after sinks fire (sink callbacks may
3766 // re-enter Core via emit and could read the tier3 set
3767 // mid-wave; the wave is over here so clearing is safe).
3768 tier3_clear();
3769 // QA-fix group 2 (2026-05-09): explicitly drop the wave guards
3770 // in REVERSE acquisition order. `parking_lot::ReentrantMutex`
3771 // doesn't care about release order for same-thread holders, but
3772 // a future migration to a non-reentrant lock (or one with a
3773 // Drop side-effect tied to ordering) would silently break if we
3774 // relied on `SmallVec`'s default forward-iteration drop. The
3775 // ascending-acquire / descending-release pattern is the
3776 // canonical lock-discipline shape.
3777 while let Some(guard) = self.wave_guards.pop() {
3778 drop(guard);
3779 }
3780 // Phase H+ STRICT (D115): drain deferred producer ops now that
3781 // THIS BatchGuard's wave_guards are released. However, if an
3782 // outer scope still holds partitions (e.g., try_subscribe's
3783 // _wave_guard), draining here would re-enter Core::subscribe /
3784 // emit while those partitions are still in held_partitions,
3785 // triggering the ascending-order check. In that case, leave the
3786 // ops in the queue — the outermost BatchGuard (whose drop runs
3787 // with no outer partitions held) will drain them.
3788 self.core.drain_deferred_producer_ops();
3789 }
3790}