graphrefly_core/batch.rs
1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//! runs `op` lock-released, then drains all transitive fn-fires and
13//! flushes per-subscriber notifications. Each fn-fire iteration drops
14//! the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//! can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//! the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//! queueing + child propagation. `&self`-only; bracket-fires
20//! `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//! pause-buffer routing. Snapshots the subscriber list at first-touch-
23//! per-wave so late subscribers (installed mid-wave between drain
24//! iterations) don't receive duplicate deliveries from messages already
25//! queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//! the consumer for fn-fire if its tracked-deps set is satisfied.
28//! Called from `commit_emission`, plus `activate_derived` and
29//! `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//! discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//! user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//! `invalidate` / `complete` / `error` / `teardown` and run a nested
38//! wave (the existing `s.in_tick` re-entrance gate composes
39//! transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//! check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//! It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//! `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//! tier-split. Re-entrance from a handshake sink callback panics with
46//! the [`reentrance_guard`] diagnostic.
47
48use std::cell::RefCell;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = thread-local.** GraphReFly's wave-engine guarantees
66// that every emit at a given node within a single wave runs on the
67// same thread (the thread that holds the partition's `wave_owner`
68// `parking_lot::ReentrantMutex` — cross-thread emits at a node BLOCK
69// on that mutex and so always land in the OTHER thread's wave). A
70// wave is bounded above by the outermost `BatchGuard` drop on its
71// originating thread. Together this means a per-thread
72// `AHashSet<NodeId>` is the natural placement for "has node X already
73// emitted a tier-3 message in this wave?" — the set's lifetime
74// exactly matches the wave's, with no cross-thread or cross-wave
75// contamination.
76//
77// **History (D1 patch, 2026-05-09):** previously placed on
78// `crate::subgraph::SubgraphLockBox::state` per-partition (Q3 v1).
79// That placement was robust to per-partition wave parallelism but
80// vulnerable to mid-wave cross-thread `set_deps` partition splits:
81// thread A is mid-wave on partition P (wave_owner held) but between
82// fn fires (`currently_firing` empty); thread B's `set_deps` acquires
83// the state lock, P13's `currently_firing.is_empty()` check
84// short-circuits, the split proceeds, and X migrates from P to a
85// fresh orphan-side partition with an empty
86// `tier3_emitted_this_wave`. Thread A's subsequent emit at X then
87// mis-detects "first emit" and queues a Resolved alongside the prior
88// Data — R1.3.3.a violation. Thread-local placement is immune to this
89// hazard: thread B's split doesn't touch thread A's thread-local at
90// all.
91//
92// **Lifecycle:** populated by `Core::commit_emission` /
93// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
94// `BatchGuard` drop on this thread (both success and panic-discard
95// paths). Re-entrant nested waves on the same thread share the set —
96// inner-wave emits add to the same set; the outermost drop is the
97// canonical clear point. Cross-thread emits NEVER touch this thread's
98// set (they serialize on the partition wave_owner; the cross-thread
99// emit happens in the OTHER thread's emit-loop and uses the OTHER
100// thread's tier3 thread-local).
101thread_local! {
102 static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
103}
104
105// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
106//
107// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
108// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
109// identical to thread_local borrow_mut. The "mutex hop is slow" intuition
110// is wrong UNCONTENDED.
111// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
112// than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
113// cache-line bouncing on the lock state itself.
114// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
115// dominated by cache-line bouncing across cores, NOT by single-thread
116// mutex acquire overhead. Moving the four wave-scoped fields to a
117// per-thread thread_local eliminates the bounce point entirely.
118//
119// **Wave scope = thread, same as `TIER3_EMITTED_THIS_WAVE`:** every emit
120// in a wave runs on the thread that holds the partition wave_owner;
121// cross-thread emits BLOCK on wave_owner and so always land in the OTHER
122// thread's wave context with the OTHER thread's WAVE_STATE. Mid-wave
123// cross-thread `set_deps` partition splits don't touch this thread's
124// thread-local at all (D1 lesson, applied here).
125//
126// **Lifecycle:** populated by `Core::commit_emission` /
127// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
128// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
129// releases any retained handles still in `wave_cache_snapshots` /
130// `deferred_handle_releases`. Defensive wave-start clear at outermost
131// owning BatchGuard entry guards against cargo's thread-reuse propagating
132// stale entries from a prior panicked-mid-wave test.
133thread_local! {
134 static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
135}
136
137/// Wave-scoped state previously held under [`Core::cross_partition`]'s
138/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
139/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
140/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
141/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
142/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
143///
144/// All fields are populated and drained within one wave on one thread.
145/// Cross-thread access is structurally impossible — cross-thread emits
146/// block on partition `wave_owner` and land in the OTHER thread's wave
147/// context.
148///
149/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
150/// `deferred_handle_releases`, and `pending_notify` hold binding-side
151/// handle retains. They MUST be drained (and released through
152/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
153/// on success and panic paths. `pending_notify` holds one retain per
154/// payload-bearing message (one per `Message::payload_handle()`); the
155/// retains are taken in `Core::queue_notify` and balanced either by
156/// `flush_notifications` (success path: pushed into
157/// `deferred_handle_releases`) or directly in the panic-discard path of
158/// `BatchGuard::drop` (taken from `pending_notify` and released).
159///
160/// The thread_local has no `Drop` hook with access to a binding — a
161/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
162/// would leak retains until the thread exits OR the next outermost
163/// wave-start clear runs (which for safety we don't fire — clearing
164/// without releasing would double-leak by losing the retain). The
165/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
166/// clears `pending_auto_resolve` + `pending_pause_overflow` +
167/// `pending_fires` (no retains) + `currently_firing` +
168/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
169/// retain-holding fields — those must be empty by construction at
170/// outermost wave start (a prior wave's panic-discard path drained them,
171/// or a prior wave's success path drained them).
172pub(crate) struct WaveState {
173 /// Payload-handle releases owed for messages that landed in
174 /// `pending_notify` during this wave (one per `payload_handle()`).
175 /// `BatchGuard::drop` releases these after sinks fire and the lock
176 /// is dropped, balancing the retain done in `queue_notify`.
177 pub(crate) deferred_handle_releases: Vec<HandleId>,
178 /// Pre-wave cache snapshots used to restore state if the wave aborts
179 /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
180 /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
181 /// the wave started writing to it. The snapshotted handle holds a
182 /// retain (taken when the snapshot was inserted) so it stays alive
183 /// for restoration. On wave success, snapshots are drained and their
184 /// retains released. On wave abort, each cache slot is restored from
185 /// the snapshot and the original retain transfers to the cache slot.
186 pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
187 /// Nodes that need an auto-Resolved at wave end if they don't receive
188 /// a tier-3+ message from their own commit_emission. Populated by
189 /// the RESOLVED child propagation in `commit_emission`. Drained by
190 /// the auto-resolve sweep in `drain_and_flush`.
191 pub(crate) pending_auto_resolve: AHashSet<NodeId>,
192 /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
193 /// [`Core::queue_notify`] when the pause buffer first overflows;
194 /// drained at wave-end after the lock-released call to
195 /// `BindingBoundary::synthesize_pause_overflow_error`.
196 pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
197 /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
198 ///
199 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
200 /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
201 /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
202 /// child-cascade `QueueFire` branch, `activate_derived`'s producer
203 /// queueing, `resume`'s pending-wave consolidation, and operator
204 /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
205 /// `fire_regular` / `fire_operator` (each removes the firing node
206 /// before invoking).
207 pub(crate) pending_fires: AHashSet<NodeId>,
208 /// Per-node outgoing message buffer; flushed at wave end. Insertion-
209 /// ordered so flush order is deterministic — load-bearing for
210 /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
211 /// companion both have queued messages in the same wave, the meta
212 /// (queued first via `teardown_inner`'s recursion order) flushes
213 /// first.
214 ///
215 /// Each entry carries the per-wave subscriber snapshot taken at first
216 /// touch (Slice A close, M1: lock-released drain). Late subscribers
217 /// installed mid-wave between fn-fire iterations don't appear in
218 /// already-snapshotted entries; this is the load-bearing fix that
219 /// prevents duplicate-Data delivery when a handshake delivers the
220 /// post-commit cache and the wave's flush would otherwise also fire
221 /// to the same sink.
222 ///
223 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
224 /// `CoreState::pending_notify` to per-thread `WaveState`. The map
225 /// holds a payload-handle retain per payload-bearing message
226 /// (`Message::payload_handle()`); these MUST be released by the
227 /// outermost `BatchGuard::drop` (success path: through
228 /// `flush_notifications` → `deferred_handle_releases`; panic path:
229 /// directly in `BatchGuard::drop`'s panic branch).
230 pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
231 // Q-beyond Sub-slice 3 (D108, 2026-05-09 → /qa F1+F2 reverted
232 // 2026-05-10): `in_tick` and `currently_firing` were moved from
233 // `CoreState` to per-thread `WaveState` in sub-slice 3, then moved
234 // BACK to `CoreState` after /qa surfaced two architectural
235 // regressions:
236 //
237 // - **F1 (in_tick):** per-thread placement broke cross-Core
238 // same-thread BatchGuard isolation. A thread holding a live
239 // BatchGuard on Core-A and starting a wave on Core-B would
240 // read `ws.in_tick = true` (set by Core-A) → Core-B becomes
241 // non-owning → Core-B's writes get drained by Core-A's
242 // binding (silent corruption). Per-Core placement on
243 // `CoreState::in_tick` restores cross-Core isolation.
244 //
245 // - **F2 (currently_firing):** per-thread placement silently
246 // bypassed the cross-thread P13 partition-migration check in
247 // `Core::set_deps`. Pre-sub-slice-3 the shared `s.currently_firing`
248 // was readable cross-thread; thread B's set_deps could observe
249 // thread A's firing pushes. Per-thread placement made thread B
250 // read its own empty stack → P13 silently bypassed for
251 // cross-thread set_deps. Per-Core placement restores the D091
252 // safety check.
253 //
254 // The other 11 wave-scoped fields stay per-thread because they're
255 // accessed only by the wave-owner thread under `wave_owner`
256 // discipline (cross-thread emits BLOCK on partition wave_owner).
257 /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
258 /// for `OnInvalidate` cleanup hook firing. A node already in this
259 /// set this wave has already had its `OnInvalidate` queued into
260 /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
261 /// `invalidate_inner` re-encounters it.
262 ///
263 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
264 /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
265 /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
266 /// cleared by `WaveState::clear_wave_state`.
267 pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
268 /// Deferred sink-fire jobs collected by `flush_notifications`.
269 /// `flush_notifications` populates this from `pending_notify`;
270 /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
271 /// each entry lock-released. Each tuple is
272 /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
273 /// waves.
274 ///
275 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
276 /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
277 /// retains held — the `Vec<Sink>` clones own Arcs that drop
278 /// naturally; the `Vec<Message>` payload retains were already moved
279 /// into `deferred_handle_releases` by `flush_notifications`.
280 pub(crate) deferred_flush_jobs: DeferredJobs,
281 /// Slice E2 (per D060/D061): lock-released drain queue for
282 /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
283 /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
284 /// drained after the lock drops at wave boundary by
285 /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
286 /// D060). Panic-discarded silently per D061.
287 ///
288 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
289 /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
290 pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
291 /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
292 /// `BindingBoundary::wipe_ctx` calls fired eagerly from
293 /// `Core::terminate_node` when a resubscribable node terminates with
294 /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
295 /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
296 /// silently.
297 ///
298 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
299 /// `CoreState::pending_wipes` to per-thread `WaveState`.
300 pub(crate) pending_wipes: Vec<NodeId>,
301}
302
303impl WaveState {
304 fn new() -> Self {
305 Self {
306 deferred_handle_releases: Vec::new(),
307 wave_cache_snapshots: HashMap::new(),
308 pending_auto_resolve: AHashSet::new(),
309 pending_pause_overflow: Vec::new(),
310 pending_fires: AHashSet::new(),
311 pending_notify: IndexMap::new(),
312 invalidate_hooks_fired_this_wave: AHashSet::new(),
313 deferred_flush_jobs: Vec::new(),
314 deferred_cleanup_hooks: Vec::new(),
315 pending_wipes: Vec::new(),
316 }
317 }
318
319 /// Wave-end clear of the non-retain-holding fields. Called from
320 /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
321 /// (`wave_cache_snapshots`, `deferred_handle_releases`,
322 /// `pending_notify`) are NOT cleared here — they follow the
323 /// success/panic paths' explicit drain discipline in
324 /// `BatchGuard::drop`.
325 pub(crate) fn clear_wave_state(&mut self) {
326 self.pending_auto_resolve.clear();
327 // pending_pause_overflow is normally drained by drain_and_flush
328 // via the synthesis loop. If a wave is panic-discarded BEFORE
329 // synthesis runs, BatchGuard::drop's panic path also clears it
330 // explicitly. Pre-wave defensive clear in
331 // `begin_batch_with_guards` makes this idempotent.
332 self.pending_pause_overflow.clear();
333 // Sub-slice 2: pending_fires is intentionally NOT cleared
334 // here. Two reasons:
335 // 1. Wave-success drain empties it by construction: every
336 // `pick_next_fire` selection is removed by
337 // `fire_regular` / `fire_operator` before invocation,
338 // and `drain_and_flush` only exits when the set is empty.
339 // 2. The `Core::resume` default-mode consolidated-fire
340 // pattern stages an entry OUTSIDE any in-tick wave and
341 // then enters a new wave to drain it; clearing here
342 // would erase that pre-staged entry. The panic-discard
343 // path in `BatchGuard::drop` clears it explicitly.
344
345 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
346 // CoreState::currently_firing — defensive clear there.
347 // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
348 // wave-scoped — cleared so the next wave can fire cleanups
349 // again.
350 self.invalidate_hooks_fired_this_wave.clear();
351 // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
352 // `pending_wipes` are intentionally NOT cleared here. They
353 // follow the same discipline as `deferred_handle_releases` /
354 // `pending_notify`:
355 // - SUCCESS path (`BatchGuard::drop` non-panic): drained by
356 // `Core::drain_deferred` AFTER `clear_wave_state` runs,
357 // then fired lock-released by `Core::fire_deferred`.
358 // - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
359 // `std::mem::take`-and-dropped AFTER `clear_wave_state`
360 // runs (silently per D061 / D069).
361 // Clearing here would race the success path: queued sink fires
362 // / cleanup hooks / wipes would be erased BEFORE
363 // `drain_deferred` could take them.
364 }
365}
366
367/// Run a closure with mutable access to this thread's [`WaveState`].
368///
369/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
370/// for sites that touch ONE field. For sites that interleave state lock
371/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
372/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
373/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
374/// pattern).
375///
376/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
377/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
378/// on nested borrow. The same discipline that the prior
379/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
380/// holding cross_partition) carries over.
381pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
382 WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
383}
384
385/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
386/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
387/// outermost owning entry. Mirrors the pre-existing tier3 defensive
388/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
389/// propagating stale entries from a prior panicked-mid-wave test.
390///
391/// The retain-holding fields (`wave_cache_snapshots` /
392/// `deferred_handle_releases`) MUST already be empty by construction at
393/// outermost wave entry — outermost `BatchGuard::drop` always drains
394/// them on both success and panic paths. If they're non-empty here it
395/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
396/// the next BatchGuard's outermost drop will eventually drain them.
397fn wave_state_clear_outermost() {
398 with_wave_state(|ws| {
399 // /qa F4 (2026-05-10): debug_assert that retain-holding fields
400 // are empty at outermost wave start. The invariant claim is
401 // "outermost BatchGuard::drop drains them on both success and
402 // panic paths, so they're empty before the next wave starts."
403 // If a panic path EVER bypasses the drain (today: not reachable
404 // because BatchGuard::drop is robust against panicking sinks via
405 // catch_unwind), this assert catches it in tests immediately
406 // rather than letting stale entries leak into the next wave's
407 // drain (which would release Core-A's HandleIds via Core-B's
408 // binding under cross-Core same-thread sequential use).
409 debug_assert!(
410 ws.wave_cache_snapshots.is_empty(),
411 "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
412 outermost wave start ({} entries) — prior BatchGuard::drop \
413 bypassed the drain (would leak retains into next wave's \
414 binding). See /qa F4 (2026-05-10).",
415 ws.wave_cache_snapshots.len()
416 );
417 debug_assert!(
418 ws.deferred_handle_releases.is_empty(),
419 "wave_state_clear_outermost: deferred_handle_releases non-empty \
420 at outermost wave start ({} entries) — prior BatchGuard::drop \
421 bypassed the drain. See /qa F4 (2026-05-10).",
422 ws.deferred_handle_releases.len()
423 );
424 debug_assert!(
425 ws.pending_notify.is_empty(),
426 "wave_state_clear_outermost: pending_notify non-empty at \
427 outermost wave start ({} entries) — prior BatchGuard::drop \
428 bypassed the drain. See /qa F4 (2026-05-10).",
429 ws.pending_notify.len()
430 );
431 ws.pending_auto_resolve.clear();
432 ws.pending_pause_overflow.clear();
433 // Sub-slice 2: pending_fires is intentionally NOT cleared here.
434 // Pre-Sub-slice-2 it lived on CoreState and survived between
435 // waves; load-bearing for `Core::resume`'s default-mode
436 // consolidated-fire pattern, which inserts into pending_fires
437 // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
438 // false at that moment) and then calls `run_wave_for(node_id)`
439 // — `run_wave_for` enters a NEW outermost wave whose drain must
440 // pick up that pre-staged pending_fires entry. Clearing here
441 // would erase it.
442 //
443 // pending_fires holds no retains, so a stale entry from a
444 // prior panicked-mid-wave test that bypassed BatchGuard::drop
445 // would leak as a spurious fire on the next wave on the same
446 // thread (no refcount damage). The panic-discard path in
447 // BatchGuard::drop and the wave-success drain together
448 // guarantee pending_fires is empty by wave end; relying on
449 // that invariant matches the pre-refactor lifecycle.
450 //
451 // Intentionally NOT clearing wave_cache_snapshots /
452 // deferred_handle_releases / pending_notify here — those hold
453 // retains and need a binding to release. Documented invariant:
454 // they're empty by outermost wave start.
455
456 // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
457 // defensively clear the OnInvalidate dedup set on outermost-wave
458 // entry. Holds no retains; a stale entry from a prior
459 // panicked-mid-wave test that bypassed BatchGuard::drop would
460 // only suppress the OnInvalidate cleanup hook for that node on
461 // the next wave (no refcount damage). Clearing matches the
462 // tier3 defensive-clear precedent.
463 //
464 // `currently_firing` was reverted to CoreState (per /qa F2 — the
465 // per-thread placement silently bypassed the cross-thread P13
466 // partition-migration check); its defensive clear lives in
467 // `CoreState::clear_wave_state` (which BatchGuard::drop runs
468 // wave-end on both success and panic paths).
469 ws.invalidate_hooks_fired_this_wave.clear();
470 // Intentionally NOT clearing deferred_flush_jobs /
471 // deferred_cleanup_hooks / pending_wipes here — by invariant
472 // they're empty at outermost wave start (drained on success
473 // by drain_deferred → fire_deferred; drained on panic by
474 // BatchGuard::drop's panic branch). Pre-clearing would race a
475 // hypothetical wave that staged into them OUTSIDE in_tick
476 // (none does today, but matching the deferred_handle_releases
477 // / pending_notify discipline keeps the invariant uniform).
478 });
479}
480
481/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
482/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
483/// wave-scope rationale.
484fn tier3_check(node: NodeId) -> bool {
485 TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
486}
487
488/// Mark `node` as having emitted a tier-3 message in the current wave on
489/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
490fn tier3_mark(node: NodeId) {
491 TIER3_EMITTED_THIS_WAVE.with(|s| {
492 s.borrow_mut().insert(node);
493 });
494}
495
496/// Wave-end clear of the per-thread tier3 tracker. Called from the
497/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
498/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
499/// invoke this — the outer wave is still in flight and inner-wave marks
500/// are part of the outer wave's Slice G coalescing state.
501fn tier3_clear() {
502 TIER3_EMITTED_THIS_WAVE.with(|s| {
503 s.borrow_mut().clear();
504 });
505}
506
507/// Deferred sink-fire jobs collected during `flush_notifications`. Each
508/// entry pairs a snapshot of the sink Arcs to fire with the messages to
509/// deliver to them — one entry per (node × phase) cell with non-empty
510/// content. Drained from `CoreState` and fired lock-released.
511pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
512
513/// Lock-released drain payload of the wave's BatchGuard:
514/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
515/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
516/// Sliced into a type alias to satisfy `clippy::type_complexity`.
517pub(crate) type WaveDeferred = (
518 DeferredJobs,
519 Vec<HandleId>,
520 Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
521 Vec<crate::handle::NodeId>,
522);
523
524/// One subscriber-snapshot epoch within a node's wave-end notification
525/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
526/// for the node in a wave, and a fresh batch is opened whenever the node's
527/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
528/// existing sink unsubscribes, or a handshake-time panic evicts an
529/// orphaned sink). All messages within one batch flush to the same sink
530/// list — the snapshot taken when the batch opened, frozen against
531/// subsequent revision bumps.
532pub(crate) struct PendingBatch {
533 /// `NodeRecord::subscribers_revision` value at the moment this batch
534 /// opened. Used by `queue_notify` to decide append-to-last-batch vs
535 /// open-fresh-batch on every push.
536 pub(crate) snapshot_revision: u64,
537 pub(crate) sinks: Vec<Sink>,
538 pub(crate) messages: Vec<Message>,
539}
540
541/// Per-node wave-end notification queue, structured as one or more
542/// subscriber-snapshot epochs (`batches`). The common case (no
543/// mid-wave subscribe / unsubscribe at this node) keeps a single
544/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
545///
546/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
547/// `(sinks, messages)` pair per node — the snapshot froze on first
548/// `queue_notify` and was reused for every subsequent emit to the same
549/// node in the wave. That caused the documented late-subscriber +
550/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
551/// between two emits to the same node was invisible to the second
552/// emit's flush slice. The revision-tracked batch list resolves it —
553/// late subs land in a fresh batch that frozenly carries them, while
554/// pre-subscribe batches retain their original snapshot so the new
555/// sub doesn't double-receive earlier emits via flush AND handshake.
556pub(crate) struct PendingPerNode {
557 pub(crate) batches: SmallVec<[PendingBatch; 1]>,
558}
559
560impl PendingPerNode {
561 /// Iterate every queued message for this node across all batches in
562 /// arrival order. Used by R1.3.3.a invariant assertions and the
563 /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
564 /// reason about wave-content per node, not per batch.
565 pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
566 self.batches.iter().flat_map(|b| b.messages.iter())
567 }
568
569 /// Mutable counterpart for `iter_messages`. Used by
570 /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
571 /// entries to Data when a wave detects a multi-emit case after the
572 /// fact.
573 pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
574 self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
575 }
576}
577
578/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
579///
580/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
581/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
582/// `set_deps(N, ...)` from inside N's own fn-fire with
583/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
584/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
585/// a different dep ordering than Phase-3's `tracked` storage.
586///
587/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
588/// callbacks like `project_each` / `predicate_each`). Drop fires even
589/// on panic, so the stack stays balanced under user-fn unwinds.
590///
591/// Membership semantics (NOT strict LIFO): the only consumer of
592/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
593/// `contains(&n)` — a set-membership test. Drop pops the right-most
594/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
595/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
596/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
597/// physical order of the remaining As may not match construction order,
598/// but membership is preserved. If a future call site needs strict LIFO
599/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
600/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
601pub(crate) struct FiringGuard {
602 core: Core,
603 node_id: NodeId,
604 /// Phase H+ option (d) /qa N1(a) widened variant (2026-05-09):
605 /// whether this guard participates in the per-thread
606 /// `IN_PRODUCER_BUILD` accounting. Producer-pattern operator
607 /// activations (`zip` / `concat` / `race` / `take_until` /
608 /// `switch_map` / `exhaust_map` / `concat_map` / `merge_map`
609 /// — all `is_producer()`) DO participate: they SUPPRESS the H+
610 /// check during their build/project closure because those
611 /// closures legitimately subscribe to upstream sources
612 /// cross-partition (operator-internal activation-time setup,
613 /// not user-fn re-entry). Refactoring those operators to defer
614 /// inner subscribes to wave-end is the broader Phase H+ STRICT
615 /// variant scope; the limited variant carves them out via this
616 /// flag. Derived / dynamic / state user-fn fires do NOT
617 /// participate — the H+ check applies to them under the
618 /// "held non-empty AND not in producer build" gate (see
619 /// `crate::node::held_partitions` module docstring).
620 ///
621 /// **INVARIANT:** the value is captured at `FiringGuard::new`
622 /// from `NodeRecord::is_producer()` for the firing node and
623 /// must NEVER be re-derived at `Drop` time. `is_producer` is
624 /// stable for a node's lifetime per the current registration
625 /// API (a node's kind cannot change once registered), but a
626 /// future contributor adding mutability MUST honor this snapshot
627 /// to keep the `producer_build_enter` / `producer_build_exit`
628 /// pair balanced. A debug_assert in Drop verifies the snapshot
629 /// still matches when assertions are enabled.
630 is_producer_build: bool,
631}
632
633impl FiringGuard {
634 pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
635 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
636 // CoreState (cross-thread visible, restoring the D091 P13 check).
637 // Push under the SAME state lock scope as the is_producer read —
638 // this also closes /qa F3 (the panic-window between
639 // with_wave_state push and Self construction is gone; a panic in
640 // `lock_state` itself can't leak the push because no push has
641 // run yet, and the push + is_producer read happen atomically
642 // under the state lock).
643 let is_producer = {
644 let mut s = core.lock_state();
645 s.currently_firing.push(node_id);
646 s.nodes
647 .get(&node_id)
648 .is_some_and(crate::node::NodeRecord::is_producer)
649 };
650 // Construct Self FIRST (capture the cached `is_producer_build`
651 // snapshot in the struct). Then call `producer_build_enter()`
652 // as a separate step. If a future contributor adds fallible
653 // / panicking code between Self construction and the enter
654 // call, the panic still leaves Self abandoned (no Drop runs
655 // because Self isn't bound to a name yet) and the
656 // `producer_build_enter` call hasn't been made — so no
657 // imbalance. This is the panic-safe ordering per /qa A4.
658 //
659 // **Cleanup-on-panic path:** a panic AFTER Self is constructed
660 // and bound to a named guard runs `Drop for FiringGuard`, which
661 // pops `currently_firing` under the state lock. A panic between
662 // the `lock_state` block above and Self construction would leak
663 // the push; in practice `lock_state` is parking_lot
664 // (no-panic-on-unpoisoned) and `core.clone()` is infallible, so
665 // the panic-window is empty for current code. The defensive
666 // clear in `CoreState::clear_wave_state` (called from
667 // BatchGuard::drop wave-end) catches any hypothetical leak.
668 let guard = Self {
669 core: core.clone(),
670 node_id,
671 is_producer_build: is_producer,
672 };
673 if is_producer {
674 crate::node::producer_build_enter();
675 }
676 guard
677 }
678}
679
680impl Drop for FiringGuard {
681 fn drop(&mut self) {
682 // INVARIANT (debug-asserted): `is_producer_build` must match
683 // the node's current `is_producer()` at Drop time. If a future
684 // refactor introduces post-construction node-kind mutation,
685 // this fails loudly under debug builds — the
686 // `producer_build_enter` / `producer_build_exit` pair would
687 // otherwise become unbalanced. Per /qa A5.
688 // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
689 // CoreState. Pop under the SAME state lock scope as the debug
690 // invariant check, atomic with the construction-side push.
691 {
692 let mut s = self.core.lock_state();
693 #[cfg(debug_assertions)]
694 {
695 // INVARIANT: `is_producer_build` must match the node's
696 // current `is_producer()` at Drop time. If a future
697 // refactor introduces post-construction node-kind
698 // mutation, this fails loudly under debug builds — the
699 // `producer_build_enter` / `producer_build_exit` pair
700 // would otherwise become unbalanced. Per /qa A5.
701 let now_producer = s
702 .nodes
703 .get(&self.node_id)
704 .is_some_and(crate::node::NodeRecord::is_producer);
705 // Allow node-removed-mid-fire (now `is_some_and(...)` is
706 // false) — that's a benign asymmetry. Real concern: a
707 // node that was non-producer at construction is now
708 // reported as producer (or vice versa for an existing
709 // node).
710 if s.nodes.contains_key(&self.node_id) {
711 debug_assert_eq!(
712 self.is_producer_build,
713 now_producer,
714 "FiringGuard invariant violation: node {:?} was {} at \
715 construction but is {} at Drop. The is_producer flag \
716 must be stable for a node's lifetime; see FiringGuard \
717 struct docstring.",
718 self.node_id,
719 if self.is_producer_build {
720 "is_producer=true"
721 } else {
722 "is_producer=false"
723 },
724 if now_producer {
725 "is_producer=true"
726 } else {
727 "is_producer=false"
728 },
729 );
730 }
731 }
732 // Pop the right-most matching node_id (membership semantics —
733 // not strict LIFO). If absent, an external rebalance already
734 // popped — silent no-op (panic-in-Drop is poison).
735 if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
736 s.currently_firing.swap_remove(pos);
737 }
738 }
739 // Phase H+ pair: decrement IN_PRODUCER_BUILD IFF we incremented
740 // in `new`. Done outside the WaveState borrow so the next
741 // wave-state access (potentially on a re-entry hot path) can
742 // proceed without our cell access on its critical section.
743 if self.is_producer_build {
744 crate::node::producer_build_exit();
745 }
746 }
747}
748
749/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
750/// uninitialized or the contained type doesn't match `T` — both are
751/// invariant violations for any `fire_op_*` helper that should only be
752/// called from `fire_operator`'s match arm for the matching variant.
753fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
754 s.require_node(node_id)
755 .op_scratch
756 .as_ref()
757 .expect("op_scratch slot uninitialized for operator node")
758 .as_any_ref()
759 .downcast_ref::<T>()
760 .expect("op_scratch type mismatch")
761}
762
763/// Mutable borrow of the per-operator scratch slot. Same invariants as
764/// [`scratch_ref`].
765fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
766 s.require_node_mut(node_id)
767 .op_scratch
768 .as_mut()
769 .expect("op_scratch slot uninitialized for operator node")
770 .as_any_mut()
771 .downcast_mut::<T>()
772 .expect("op_scratch type mismatch")
773}
774
775impl Core {
776 // -------------------------------------------------------------------
777 // Wave entry + drain
778 // -------------------------------------------------------------------
779
780 /// Wave entry. The caller passes a closure that performs the wave's
781 /// triggering operation (`commit_emission`, `terminate_node`, etc.).
782 /// The closure runs lock-released; closure-internal Core methods
783 /// acquire the state lock as they go.
784 ///
785 /// **Implementation:** delegates to [`Self::begin_batch`] for the
786 /// wave's RAII lifecycle. The returned `BatchGuard` holds the
787 /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
788 /// emits block; same-thread re-entry passes through), claims `in_tick`,
789 /// and on drop runs the drain + flush + sink-fire phases — OR, if the
790 /// closure panicked, the panic-discard path that restores cache
791 /// snapshots and clears in_tick. This unification gives `run_wave` the
792 /// same panic-safety guarantee as the user-facing `Core::batch`.
793 ///
794 /// **Re-entrance:** a closure invoked from inside another wave — the
795 /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
796 /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
797 /// The outer wave's drain picks up the inner closure's queued work.
798 ///
799 /// **Lock-release discipline (Slice A close, M1):** all binding-side
800 /// callbacks except the subscribe-time handshake fire lock-released.
801 /// Sinks that re-enter Core run a nested wave; user fns that re-enter
802 /// Core run a nested wave; custom-equals oracles that re-enter Core
803 /// run a nested wave. Cross-thread emits block at `wave_owner` until
804 /// the in-flight wave's drain completes — preserving the user-facing
805 /// "emit returning means subscribers have observed" contract.
806 /// Wave entry with a known `seed` node. Acquires only the partitions
807 /// transitively touched from `seed` (downstream cascade via
808 /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
809 /// current partition. The canonical Y1 parallelism win for per-seed
810 /// entry points (`Core::emit`, `Core::subscribe`'s activation,
811 /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
812 /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
813 /// push-on-subscribe).
814 ///
815 /// Two threads with disjoint touched-partition sets run truly
816 /// parallel — they don't block each other on Core-global locks.
817 /// Same-thread re-entry passes through each partition's
818 /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
819 /// partition (or any overlapping touched-partition set) serialize
820 /// per the per-partition `wave_owner` mutex, preserving the
821 /// "emit returning means subscribers have observed" contract.
822 ///
823 /// Slice Y1 / Phase E (2026-05-08).
824 pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
825 where
826 F: FnOnce(&Self),
827 {
828 let _guard = self.begin_batch_for(seed);
829 op(self);
830 }
831
832 /// Drain retains held by `wave_cache_snapshots` and return them so
833 /// the caller can release them lock-released. Called from the
834 /// wave-success path in [`BatchGuard::drop`].
835 ///
836 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
837 /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
838 /// drain-and-release-lock-released discipline (introduced as /qa A1
839 /// fix 2026-05-09 against the prior cross_partition mutex) carries
840 /// over: caller drains under WaveState borrow + state lock, releases
841 /// after both are dropped — `release_handle` may re-enter Core via
842 /// finalizers and re-entry under either guard would deadlock /
843 /// double-borrow.
844 #[must_use]
845 pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
846 if ws.wave_cache_snapshots.is_empty() {
847 return Vec::new();
848 }
849 std::mem::take(&mut ws.wave_cache_snapshots)
850 .into_values()
851 .collect()
852 }
853
854 /// Restore cache slots from `wave_cache_snapshots` and clear the map.
855 /// Called from the wave-abort path in `BatchGuard::drop` (panic).
856 ///
857 /// For each snapshotted node:
858 ///
859 /// 1. Read the current cache (the in-flight new value).
860 /// 2. Set `cache = old_handle` (the snapshot's retained value).
861 /// 3. Release the now-unowned current cache handle.
862 ///
863 /// Returns the list of "current" handles to release outside the lock.
864 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
865 /// to per-thread `WaveState`; signature takes both `s` (for cache
866 /// slots) and `ws` (for the snapshots map).
867 pub(crate) fn restore_wave_cache_snapshots(
868 &self,
869 s: &mut CoreState,
870 ws: &mut WaveState,
871 ) -> Vec<HandleId> {
872 if ws.wave_cache_snapshots.is_empty() {
873 return Vec::new();
874 }
875 let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
876 let mut releases = Vec::with_capacity(snapshots.len());
877 for (node_id, old_handle) in snapshots {
878 let Some(rec) = s.nodes.get_mut(&node_id) else {
879 releases.push(old_handle);
880 continue;
881 };
882 let current = std::mem::replace(&mut rec.cache, old_handle);
883 if current != NO_HANDLE {
884 releases.push(current);
885 }
886 }
887 releases
888 }
889
890 /// Drain pending fires until quiescent, then flush wave-end notifications
891 /// to subscribers. Each fire iteration drops the state lock around the
892 /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
893 ///
894 /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
895 /// and [`super::node::Core::activate_derived`] (via `run_wave`).
896 pub(crate) fn drain_and_flush(&self) {
897 let mut guard = 0u32;
898 loop {
899 // R1.3.8.c (Slice F, A3): if no fires are pending but there are
900 // queued pause-overflow ERRORs, synthesize them now. The
901 // resulting ERROR cascade may add to pending_fires (children
902 // settling their terminal state), so we loop back to drain.
903 //
904 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
905 // and pending_pause_overflow both live on per-thread
906 // WaveState. State lock no longer required for either read.
907 let synth_pending = with_wave_state(|ws| {
908 if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
909 std::mem::take(&mut ws.pending_pause_overflow)
910 } else {
911 Vec::new()
912 }
913 });
914 for entry in synth_pending {
915 // Lock-released call to the binding hook. Default impl
916 // returns None — the binding has opted out of R1.3.8.c
917 // and we fall back to silent-drop + ResumeReport.dropped.
918 let handle = self.binding.synthesize_pause_overflow_error(
919 entry.node_id,
920 entry.dropped_count,
921 entry.configured_max,
922 entry.lock_held_ns / 1_000_000,
923 );
924 if let Some(h) = handle {
925 // Re-enter Core::error to terminate the node and
926 // cascade. We're inside a wave (`in_tick = true`),
927 // so error() gets a non-owning batch guard — it
928 // doesn't try to start its own drain. The cascade
929 // queues into our outer drain via pending_fires
930 // and pending_notify.
931 self.error(entry.node_id, h);
932 }
933 }
934
935 // Pick next fire under a short lock. Also re-read the configured
936 // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
937 // without restarting waves mid-flight.
938 //
939 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
940 // on per-thread WaveState; pick_next_fire takes both state and
941 // WaveState. The pending_size diagnostic and emptiness check
942 // also read WaveState. Borrow scopes are split: WaveState
943 // borrow drops before fire_fn runs (which re-borrows WaveState
944 // via fire_regular / fire_operator).
945 let (next, cap, pending_size) = {
946 let s = self.lock_state();
947 let cap = s.max_batch_drain_iterations;
948 let (next, pending_size) = with_wave_state(|ws| {
949 if ws.pending_fires.is_empty() {
950 return (None, 0);
951 }
952 let size = ws.pending_fires.len();
953 let next = Self::pick_next_fire(&s, ws);
954 (next, size)
955 });
956 if pending_size == 0 {
957 break;
958 }
959 (next, cap, pending_size)
960 };
961 guard += 1;
962 assert!(
963 guard < cap,
964 "wave drain exceeded {cap} iterations \
965 (pending_fires={pending_size}). Most likely cause: a runtime \
966 cycle introduced by an operator that re-arms its own pending_fires \
967 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
968 itself, or a fn that calls Core::emit on a node whose fn fires \
969 the original node again). Structural cycles via set_deps are \
970 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
971 only with concrete evidence the workload needs more iterations."
972 );
973 let Some(next) = next else { break };
974 // fire_fn manages its own locking around invoke_fn.
975 self.fire_fn(next);
976 }
977 // Auto-resolve sweep: nodes registered in pending_auto_resolve
978 // by the RESOLVED child propagation need a Resolved if they didn't
979 // fire and settle via their own commit_emission. Check pending_notify
980 // for each candidate — if it has Dirty but no tier-3+ message, the
981 // node never settled and needs auto-Resolved. Route through
982 // queue_notify so paused nodes get the Resolved into their pause
983 // buffer.
984 let mut s = self.lock_state();
985 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
986 // and pending_notify both live on per-thread WaveState. /qa A5
987 // fix (2026-05-09): explicit scope for the WaveState borrow so
988 // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
989 // re-borrows WaveState for `pending_pause_overflow.push` /
990 // `pending_notify` writes — re-entrance on RefCell::borrow_mut
991 // would panic. Explicit scope makes the lifetime load-bearing.
992 let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
993 for node_id in candidates {
994 let needs_resolve = with_wave_state(|ws| {
995 ws.pending_notify
996 .get(&node_id)
997 .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
998 });
999 if needs_resolve {
1000 self.queue_notify(&mut s, node_id, Message::Resolved);
1001 }
1002 }
1003 // Final flush phase — populates deferred_flush_jobs
1004 // from pending_notify (already carries per-node sink snapshots).
1005 self.flush_notifications(&mut s);
1006 }
1007
1008 /// Pick a node whose **transitive** upstream is fully settled.
1009 ///
1010 /// A node `id` is ready to fire iff none of its transitive ancestors
1011 /// (via the `deps` chain) is currently in `pending_fires`. Diamond
1012 /// glitch prevention requires transitive (not immediate-only) reasoning
1013 /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
1014 /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
1015 /// is added. An immediate-only readiness check would mistakenly pick
1016 /// `F` because neither `D` nor `E` are in `pending_fires` at that
1017 /// moment, firing `F` against the stale activation-time `D` cache and
1018 /// again later when `D` actually settles. Transitive upstream walk
1019 /// catches `B`/`C` pending and correctly defers `F`.
1020 ///
1021 /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
1022 /// porting-deferred entry on `pick_next_fire` perf flagged this as a
1023 /// future per-node `unresolved_dep_count` refactor.
1024 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1025 /// per-thread WaveState. Caller passes `&WaveState` alongside
1026 /// `&CoreState` so the borrow scopes stay disjoint and visible.
1027 fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1028 for &id in &ws.pending_fires {
1029 if Self::transitive_upstream_settled(s, ws, id) {
1030 return Some(id);
1031 }
1032 }
1033 // Cycle / no eligible candidate (every node has an upstream pending,
1034 // possibly via a cycle path): pick any so the drain guard advances.
1035 // The drain-iteration cap will catch genuine cycles.
1036 ws.pending_fires.iter().copied().next()
1037 }
1038
1039 fn transitive_upstream_settled(s: &CoreState, ws: &WaveState, node_id: NodeId) -> bool {
1040 let rec = s.require_node(node_id);
1041 if rec.dep_count() == 0 {
1042 return true;
1043 }
1044 let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
1045 let mut stack: Vec<NodeId> = rec.dep_ids_vec();
1046 while let Some(id) = stack.pop() {
1047 if !visited.insert(id) {
1048 continue;
1049 }
1050 if ws.pending_fires.contains(&id) {
1051 return false;
1052 }
1053 if let Some(r) = s.nodes.get(&id) {
1054 for dep_id in r.dep_ids() {
1055 if !visited.contains(&dep_id) {
1056 stack.push(dep_id);
1057 }
1058 }
1059 }
1060 }
1061 true
1062 }
1063
1064 /// Wave drain entry point. Dispatches via `rec.op` to either the
1065 /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1066 /// dispatch ([`Self::fire_operator`]).
1067 pub(crate) fn fire_fn(&self, node_id: NodeId) {
1068 let op = {
1069 let s = self.lock_state();
1070 s.nodes.get(&node_id).and_then(|r| r.op)
1071 };
1072 match op {
1073 Some(operator_op) => self.fire_operator(node_id, operator_op),
1074 None => {
1075 // State / Derived / Dynamic / Producer all dispatch via fn_id.
1076 self.fire_regular(node_id);
1077 }
1078 }
1079 }
1080
1081 /// Fire a node's fn lock-released around `invoke_fn`.
1082 ///
1083 /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1084 /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1085 /// or stateless.
1086 ///
1087 /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1088 /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1089 /// wave — the in_tick gate composes naturally because nested calls
1090 /// observe `in_tick = true` and skip their own drain.
1091 ///
1092 /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1093 /// decide between Noop+RESOLVED, single Data, or Batch.
1094 ///
1095 /// Phase 4: commit emissions. Single Data goes through
1096 /// `commit_emission` (with equals substitution). Batch emissions are
1097 /// processed in sequence — Data via `commit_emission_verbatim` (no
1098 /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1099 /// terminal cascade.
1100 #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1101 fn fire_regular(&self, node_id: NodeId) {
1102 enum FireAction {
1103 None,
1104 SingleData(HandleId),
1105 Batch(SmallVec<[FnEmission; 2]>),
1106 }
1107
1108 // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1109 // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1110 // (Phase 1.5 below): the cleanup hook only fires when the fn has
1111 // run at least once already in this activation cycle.
1112 let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
1113 let s = self.lock_state();
1114 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1115 // on per-thread WaveState. Removed via with_wave_state — no
1116 // re-entry concern because only the immediate remove happens
1117 // under the borrow.
1118 with_wave_state(|ws| {
1119 ws.pending_fires.remove(&node_id);
1120 });
1121 let rec = s.require_node(node_id);
1122 // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1123 // mode opts out of the gate per D011), or stateless.
1124 if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
1125 None
1126 } else {
1127 rec.fn_id.map(|fn_id| {
1128 let dep_batches: Vec<DepBatch> = rec
1129 .dep_records
1130 .iter()
1131 .map(|dr| DepBatch {
1132 data: dr.data_batch.clone(),
1133 prev_data: dr.prev_data,
1134 involved: dr.involved_this_wave,
1135 })
1136 .collect();
1137 (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
1138 })
1139 }
1140 };
1141 let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
1142 return;
1143 };
1144
1145 // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1146 // the fn has fired at least once in this activation cycle, fire its
1147 // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1148 // local resources. First-fire is intentionally skipped — there is
1149 // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1150 // cleanup re-entrance is not the A6 reentrancy concern (which
1151 // protects against `set_deps(self, ...)` from inside the in-flight
1152 // invoke_fn). Operator nodes never reach this path (`fire_regular`
1153 // is the fn-id branch of `fire_fn`; operators dispatch via
1154 // `fire_operator`), so cleanup hooks correctly only fire for fn-
1155 // shaped nodes (state / derived / dynamic / producer).
1156 if has_fired_once {
1157 self.binding
1158 .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1159 }
1160
1161 // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1162 // the FFI call only — Phase 3's lock-held state mutation is not part
1163 // of "currently firing" because set_deps would already block on the
1164 // state lock by then. Drop on the guard pops the stack even if
1165 // invoke_fn panics, keeping `currently_firing` balanced.
1166 let result = {
1167 let _firing = FiringGuard::new(self, node_id);
1168 self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1169 };
1170
1171 // Phase 3: apply result under the lock — defensive terminal check
1172 // (a sibling cascade may have terminated this node during phase 2).
1173 let action: FireAction = {
1174 let mut s = self.lock_state();
1175 // Defensive: node may have terminated mid-phase-2 via a sibling
1176 // cascade (a fn that re-entered `Core::error` on a path that
1177 // cascaded here). If so, release any payload handles and no-op.
1178 if s.require_node(node_id).terminal.is_some() {
1179 match &result {
1180 FnResult::Data { handle, .. } => {
1181 self.binding.release_handle(*handle);
1182 }
1183 FnResult::Batch { emissions, .. } => {
1184 for em in emissions {
1185 match em {
1186 FnEmission::Data(h) | FnEmission::Error(h) => {
1187 self.binding.release_handle(*h);
1188 }
1189 FnEmission::Complete => {}
1190 }
1191 }
1192 }
1193 FnResult::Noop { .. } => {}
1194 }
1195 return;
1196 }
1197 let rec = s.require_node_mut(node_id);
1198 rec.has_fired_once = true;
1199 if is_dynamic {
1200 let tracked = match &result {
1201 FnResult::Data { tracked, .. }
1202 | FnResult::Noop { tracked }
1203 | FnResult::Batch { tracked, .. } => tracked.clone(),
1204 };
1205 if let Some(t) = tracked {
1206 rec.tracked = t.into_iter().collect();
1207 }
1208 }
1209 match result {
1210 FnResult::Noop { .. } => {
1211 // Slice G: skip Resolved if a prior emission in the same
1212 // wave already queued tier-3 (would violate R1.3.3.a).
1213 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1214 // lives on per-thread WaveState. Borrow scoped to the
1215 // tier3 read so queue_notify (which re-borrows
1216 // WaveState) doesn't double-borrow.
1217 let already_dirty = s.require_node(node_id).dirty;
1218 let already_tier3 = with_wave_state(|ws| {
1219 ws.pending_notify
1220 .get(&node_id)
1221 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1222 });
1223 if already_dirty && !already_tier3 {
1224 self.queue_notify(&mut s, node_id, Message::Resolved);
1225 }
1226 FireAction::None
1227 }
1228 FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1229 FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1230 // Empty Batch is equivalent to Noop — settle with
1231 // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1232 // skip if a prior emission already queued tier-3.
1233 // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1234 // arm above for the WaveState borrow scope rationale.
1235 let already_dirty = s.require_node(node_id).dirty;
1236 let already_tier3 = with_wave_state(|ws| {
1237 ws.pending_notify
1238 .get(&node_id)
1239 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1240 });
1241 if already_dirty && !already_tier3 {
1242 self.queue_notify(&mut s, node_id, Message::Resolved);
1243 }
1244 FireAction::None
1245 }
1246 FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1247 }
1248 };
1249
1250 // Phase 4: commit emissions.
1251 match action {
1252 FireAction::None => {}
1253 // Single Data — equals substitution applies (R1.3.2).
1254 FireAction::SingleData(handle) => {
1255 self.commit_emission(node_id, handle);
1256 }
1257 // Batch — process in sequence. No equals substitution
1258 // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1259 FireAction::Batch(emissions) => {
1260 self.commit_batch(node_id, emissions);
1261 }
1262 }
1263 }
1264
1265 /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1266 /// through `commit_emission_verbatim` (no equals substitution per
1267 /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1268 /// cascade per R1.3.4; processing stops at the first terminal and
1269 /// remaining handles are released (R1.3.4.a: no further messages
1270 /// after terminal).
1271 fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1272 let mut iter = emissions.into_iter();
1273 for em in iter.by_ref() {
1274 match em {
1275 FnEmission::Data(handle) => {
1276 self.commit_emission_verbatim(node_id, handle);
1277 }
1278 FnEmission::Complete => {
1279 self.complete(node_id);
1280 break;
1281 }
1282 FnEmission::Error(handle) => {
1283 self.error(node_id, handle);
1284 break;
1285 }
1286 }
1287 }
1288 // Release handles from any emissions after the terminal break.
1289 for em in iter {
1290 match em {
1291 FnEmission::Data(h) | FnEmission::Error(h) => {
1292 self.binding.release_handle(h);
1293 }
1294 FnEmission::Complete => {}
1295 }
1296 }
1297 }
1298
1299 // -------------------------------------------------------------------
1300 // Emission commit — equals-substitution lives here
1301 // -------------------------------------------------------------------
1302
1303 /// Apply a node's emission. `&self`-only; brackets the equals check
1304 /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1305 /// Core safely.
1306 ///
1307 /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1308 /// equals_mode + old cache handle.
1309 ///
1310 /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1311 /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1312 /// crosses to the binding's `custom_equals` oracle, which may re-enter
1313 /// Core.
1314 ///
1315 /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1316 /// pending_notify (which snapshots subscribers on first touch),
1317 /// propagate to children.
1318 // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1319 // function is already a multi-phase wave-engine routine and breaking
1320 // out the four phases would obscure the lock-discipline.
1321 #[allow(clippy::too_many_lines)]
1322 pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1323 assert!(
1324 new_handle != NO_HANDLE,
1325 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1326 );
1327
1328 // Phase 1: terminal short-circuit + snapshot equals/cache.
1329 let snapshot = {
1330 let s = self.lock_state();
1331 let rec = s.require_node(node_id);
1332 if rec.terminal.is_some() {
1333 drop(s);
1334 self.binding.release_handle(new_handle);
1335 return;
1336 }
1337 (rec.cache, rec.equals)
1338 };
1339 let (old_handle, equals_mode) = snapshot;
1340
1341 // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1342 // fires for SINGLE-DATA waves at one node. Detect "this is a
1343 // subsequent emit in the same wave at this node" via the
1344 // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1345 // (D1 patch, 2026-05-09 — moved off per-partition state to be
1346 // robust against mid-wave cross-thread `set_deps` partition
1347 // splits). If set → multi-emit wave: skip equals, queue Data
1348 // verbatim, retroactively rewrite any prior Resolved (queued by
1349 // an earlier same-value emit's equals match) to Data using the
1350 // wave-start cache snapshot. Outside batch / first emit:
1351 // standard per-emit equals path. Thread-local lookup is
1352 // ~5ns and lock-free.
1353 let is_subsequent_emit_in_wave = tier3_check(node_id);
1354
1355 if is_subsequent_emit_in_wave {
1356 // Multi-emit wave detected. Skip equals, queue Data verbatim.
1357 // Also rewrite any prior Resolved entries to Data using the
1358 // wave-start cache snapshot.
1359 self.rewrite_prior_resolved_to_data(node_id);
1360 self.commit_emission_verbatim(node_id, new_handle);
1361 return;
1362 }
1363
1364 // Phase 2: equals check (lock-released for Custom).
1365 let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1366
1367 // Phase 3: apply emission under the lock. Defensive terminal
1368 // re-check — a concurrent cascade between phase 2 and phase 3
1369 // could have terminated the node.
1370 let mut s = self.lock_state();
1371 if s.require_node(node_id).terminal.is_some() {
1372 drop(s);
1373 self.binding.release_handle(new_handle);
1374 return;
1375 }
1376
1377 // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1378 // dirty from an earlier emission in the same wave.
1379 let already_dirty = s.require_node(node_id).dirty;
1380 s.require_node_mut(node_id).dirty = true;
1381 if !already_dirty {
1382 self.queue_notify(&mut s, node_id, Message::Dirty);
1383 }
1384
1385 if is_data {
1386 // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1387 // re-entry from a `custom_equals` oracle that called back into
1388 // `Core::emit` on this same node during phase 2's lock-released
1389 // equals check could have advanced the cache between phase 1's
1390 // snapshot (`old_handle`) and this point.
1391 let current_cache = s.require_node(node_id).cache;
1392 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1393 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1394 // in_tick lives on CoreState (cross-Core isolation requires
1395 // per-Core placement, not per-thread). Read in_tick under the
1396 // already-held state lock; snapshot-take via with_wave_state.
1397 let in_tick = s.in_tick;
1398 let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1399 use std::collections::hash_map::Entry;
1400 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1401 Entry::Vacant(slot) => {
1402 slot.insert(current_cache);
1403 true
1404 }
1405 Entry::Occupied(_) => false,
1406 })
1407 } else {
1408 false
1409 };
1410 s.require_node_mut(node_id).cache = new_handle;
1411 if current_cache != NO_HANDLE && !snapshot_taken {
1412 self.binding.release_handle(current_cache);
1413 }
1414 // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1415 // buffer if the node opted in. RESOLVED entries are NOT
1416 // buffered (canonical "DATA only").
1417 self.push_replay_buffer(&mut s, node_id, new_handle);
1418 // Slice G (D1 patch, 2026-05-09): mark this node as having
1419 // emitted tier-3 in this wave on the per-thread tracker.
1420 tier3_mark(node_id);
1421 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1422 // Propagate to children
1423 let child_ids: Vec<NodeId> = s
1424 .children
1425 .get(&node_id)
1426 .map(|c| c.iter().copied().collect())
1427 .unwrap_or_default();
1428 for child_id in child_ids {
1429 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1430 if let Some(idx) = dep_idx {
1431 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1432 }
1433 }
1434 } else {
1435 // RESOLVED: handle unchanged. Don't release; old still in use.
1436 // Slice G: snapshot cache so a subsequent same-wave emit can
1437 // rewrite this Resolved to Data using the snapshot.
1438 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1439 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1440 // in_tick lives on CoreState — read under held state lock.
1441 let current_cache = s.require_node(node_id).cache;
1442 if s.in_tick && current_cache != NO_HANDLE {
1443 use std::collections::hash_map::Entry;
1444 with_wave_state(|ws| {
1445 if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1446 self.binding.retain_handle(current_cache);
1447 slot.insert(current_cache);
1448 }
1449 });
1450 }
1451 // Slice G (D1 patch, 2026-05-09): mark this node as having
1452 // emitted tier-3 in this wave on the per-thread tracker.
1453 tier3_mark(node_id);
1454 self.queue_notify(&mut s, node_id, Message::Resolved);
1455 let child_ids: Vec<NodeId> = s
1456 .children
1457 .get(&node_id)
1458 .map(|c| c.iter().copied().collect())
1459 .unwrap_or_default();
1460 // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1461 // during the loop and bulk-insert into pending_auto_resolve
1462 // under a SINGLE cross_partition acquire after the loop.
1463 // Pre-fix the loop acquired `cross_partition` once per
1464 // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1465 // which is N mutex hops for an N-child cascade. Cannot
1466 // hoist to acquire-cps-before-loop because `queue_notify`
1467 // (called inside the loop) also acquires cross_partition
1468 // for `pending_pause_overflow.push` in the rare overflow
1469 // case — re-entrance on the non-reentrant Mutex would
1470 // self-deadlock.
1471 let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1472 for child_id in child_ids {
1473 let already_involved = s.require_node(child_id).involved_this_wave;
1474 if !already_involved {
1475 {
1476 let child = s.require_node_mut(child_id);
1477 child.involved_this_wave = true;
1478 child.dirty = true;
1479 }
1480 self.queue_notify(&mut s, child_id, Message::Dirty);
1481 // Q2 (2026-05-09): pending_auto_resolve lives on
1482 // CrossPartitionState. Deferred to after-loop
1483 // bulk insert per the /qa A7 fix above.
1484 auto_resolve_inserts.push(child_id);
1485 }
1486 }
1487 // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1488 // single WaveState borrow for the bulk-insert. queue_notify
1489 // above no longer holds the WaveState borrow by the time we
1490 // reach here, so this borrow is uncontested.
1491 if !auto_resolve_inserts.is_empty() {
1492 with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1493 }
1494 }
1495 }
1496
1497 /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1498 /// emit arrives while a prior tier-3 message is still pending), rewrite
1499 /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1500 /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1501 /// Touches both `pending_notify` (immediate-flush path) and the per-node
1502 /// pause buffer (paused path).
1503 fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1504 let mut s = self.lock_state();
1505 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1506 // and pending_notify both live on per-thread WaveState. Single
1507 // WaveState borrow handles both the snapshot lookup and the
1508 // pending_notify rewrite; the pause-buffer path uses the state
1509 // lock and is independent of WaveState.
1510 let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1511 Some(h) if h != NO_HANDLE => h,
1512 // No snapshot available — the prior Resolved was queued without
1513 // a cache (sentinel pre-emit). Nothing to rewrite to; the
1514 // multi-emit case from sentinel is fine (verbatim Data path).
1515 _ => return,
1516 };
1517 let mut retains_needed = 0u32;
1518 // Pending_notify path. Walk all batches' messages — Slice-G
1519 // coalescing reasons about wave-content per node, not per-batch.
1520 with_wave_state(|ws| {
1521 if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1522 for msg in entry.iter_messages_mut() {
1523 if matches!(msg, Message::Resolved) {
1524 *msg = Message::Data(snapshot);
1525 retains_needed += 1;
1526 }
1527 }
1528 }
1529 });
1530 // Pause-buffer path.
1531 if let Some(rec) = s.nodes.get_mut(&node_id) {
1532 if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1533 for msg in &mut *buffer {
1534 if matches!(msg, Message::Resolved) {
1535 *msg = Message::Data(snapshot);
1536 retains_needed += 1;
1537 }
1538 }
1539 }
1540 }
1541 drop(s);
1542 // Each rewritten Resolved → Data adds a payload retain that
1543 // queue_notify would otherwise have taken at emit time. The
1544 // snapshot already owns one retain (taken when cache was
1545 // snapshotted); we need one fresh retain per rewrite.
1546 for _ in 0..retains_needed {
1547 self.binding.retain_handle(snapshot);
1548 }
1549 }
1550
1551 /// Equals check that crosses the binding boundary lock-released for
1552 /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1553 fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1554 if a == b {
1555 return true; // identity-on-handles always sufficient
1556 }
1557 if a == NO_HANDLE || b == NO_HANDLE {
1558 return false;
1559 }
1560 match mode {
1561 EqualsMode::Identity => false,
1562 EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1563 }
1564 }
1565
1566 /// Commit a DATA emission **without** equals substitution — used by
1567 /// `FnResult::Batch` processing where multi-message waves pass through
1568 /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1569 /// R1.3.1.a condition (b): only queued if node not already dirty.
1570 ///
1571 /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1572 /// but skips the Phase 2 equals check entirely.
1573 fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1574 assert!(
1575 new_handle != NO_HANDLE,
1576 "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1577 );
1578
1579 let mut s = self.lock_state();
1580 let rec = s.require_node(node_id);
1581 if rec.terminal.is_some() {
1582 drop(s);
1583 self.binding.release_handle(new_handle);
1584 return;
1585 }
1586
1587 // R1.3.1.a condition (b): DIRTY only if not already dirty.
1588 let already_dirty = s.require_node(node_id).dirty;
1589 s.require_node_mut(node_id).dirty = true;
1590 if !already_dirty {
1591 self.queue_notify(&mut s, node_id, Message::Dirty);
1592 }
1593
1594 // Always DATA — no equals substitution for Batch emissions.
1595 // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1596 // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1597 // in_tick lives on CoreState — read under held state lock.
1598 let current_cache = s.require_node(node_id).cache;
1599 let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
1600 use std::collections::hash_map::Entry;
1601 with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1602 Entry::Vacant(slot) => {
1603 slot.insert(current_cache);
1604 true
1605 }
1606 Entry::Occupied(_) => false,
1607 })
1608 } else {
1609 false
1610 };
1611 s.require_node_mut(node_id).cache = new_handle;
1612 if current_cache != NO_HANDLE && !snapshot_taken {
1613 self.binding.release_handle(current_cache);
1614 }
1615 // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1616 self.push_replay_buffer(&mut s, node_id, new_handle);
1617 // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1618 // tier3_emitted_this_wave on the per-thread tracker even on the
1619 // verbatim path. A subsequent commit_emission at the same node
1620 // in the same wave needs this flag to detect multi-emit and
1621 // skip equals substitution; without it, a Batch-then-standard
1622 // sequence would queue Resolved into a wave that already has
1623 // Data — violating R1.3.3.a. The Batch path itself still
1624 // passes verbatim per R1.3.3.c (we don't re-run equals here);
1625 // we just record that "this node has emitted tier-3 in this
1626 // wave."
1627 tier3_mark(node_id);
1628 self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1629 // Propagate to children
1630 let child_ids: Vec<NodeId> = s
1631 .children
1632 .get(&node_id)
1633 .map(|c| c.iter().copied().collect())
1634 .unwrap_or_default();
1635 for child_id in child_ids {
1636 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1637 if let Some(idx) = dep_idx {
1638 self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1639 }
1640 }
1641 }
1642
1643 /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1644 /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1645 /// fresh retain on push. RESOLVED is NOT buffered per canonical
1646 /// "DATA only" — call sites only invoke this for Data emissions.
1647 ///
1648 /// Evicted handle is queued into `cps.deferred_handle_releases`
1649 /// (released lock-released at flush time) per the binding-boundary
1650 /// lock-release discipline — `release_handle` may re-enter Core via
1651 /// finalizers and must not run while the state lock is held
1652 /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1653 /// CrossPartitionState; this fn acquires `cross_partition` only
1654 /// when an eviction actually happens (the common case is no
1655 /// eviction → no second-mutex acquire).
1656 fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1657 let rec = s.require_node_mut(node_id);
1658 let cap = match rec.replay_buffer_cap {
1659 Some(c) if c > 0 => c,
1660 _ => return,
1661 };
1662 self.binding.retain_handle(new_handle);
1663 rec.replay_buffer.push_back(new_handle);
1664 let evicted = if rec.replay_buffer.len() > cap {
1665 rec.replay_buffer.pop_front()
1666 } else {
1667 None
1668 };
1669 if let Some(h) = evicted {
1670 with_wave_state(|ws| ws.deferred_handle_releases.push(h));
1671 }
1672 }
1673
1674 // ===================================================================
1675 // Operator dispatch (Slice C-1, D009).
1676 //
1677 // `fire_operator` is the entry point for nodes whose `kind` is
1678 // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1679 // to per-operator helpers that snapshot inputs under the lock, drop the
1680 // lock to call the binding's bulk projection FFI, and reacquire to
1681 // apply emissions via `commit_emission_verbatim` (no per-item equals
1682 // dedup at the wire — operator output passes verbatim per the same
1683 // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1684 //
1685 // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1686 // share retains owned by the wave's data-batch slot (released at
1687 // wave-end rotation in `clear_wave_state`). Operators that emit those
1688 // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1689 // `prev` carry-over) take an additional retain via `retain_handle`
1690 // before passing to `commit_emission_verbatim` — the cache slot owns
1691 // its own share, independent of the data-batch slot's. Operators that
1692 // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1693 // packed tuples) receive retains pre-bumped by the binding's bulk-
1694 // projection method.
1695 // ===================================================================
1696
1697 /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1698 /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1699 /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1700 fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1701 // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1702 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1703 // per-thread WaveState; state lock + WaveState borrow are
1704 // independent.
1705 let proceed = {
1706 let s = self.lock_state();
1707 with_wave_state(|ws| {
1708 ws.pending_fires.remove(&node_id);
1709 });
1710 let rec = s.require_node(node_id);
1711 if rec.terminal.is_some() {
1712 false
1713 } else {
1714 // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1715 // (D011) opt out of the gate; otherwise we wait for every
1716 // dep to have delivered at least one real handle. Terminal-
1717 // aware operators (currently `Reduce`) additionally count a
1718 // dep terminal as "real input" so they can fire on
1719 // upstream COMPLETE-without-DATA and emit the seed.
1720 let has_real_input = !rec.has_sentinel_deps()
1721 || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1722 rec.partial || has_real_input
1723 }
1724 };
1725 if !proceed {
1726 return;
1727 }
1728
1729 // A6 (Slice F, 2026-05-07): track operator fire on the
1730 // `currently_firing` stack so a binding-side project/predicate/fold
1731 // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1732 // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1733 // the stack on panic too.
1734 let _firing = FiringGuard::new(self, node_id);
1735
1736 match op {
1737 OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1738 OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1739 OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1740 OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1741 OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1742 self.fire_op_distinct(node_id, equals_fn_id);
1743 }
1744 OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1745 OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1746 OperatorOp::WithLatestFrom { pack_fn } => {
1747 self.fire_op_with_latest_from(node_id, pack_fn);
1748 }
1749 OperatorOp::Merge => self.fire_op_merge(node_id),
1750 OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1751 OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1752 OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1753 // The variant carries `default` for `register_operator`'s
1754 // `make_op_scratch` path; once registered, the live default
1755 // is read from `LastState::default` inside `fire_op_last`.
1756 OperatorOp::Last { .. } => self.fire_op_last(node_id),
1757 }
1758 }
1759
1760 /// Snapshot the operator's single dep batch (transform constraint —
1761 /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1762 /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1763 /// `dep_records[0].terminal` at snapshot time.
1764 fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1765 let s = self.lock_state();
1766 let rec = s.require_node(node_id);
1767 debug_assert!(
1768 !rec.dep_records.is_empty(),
1769 "transform operator must have ≥1 dep"
1770 );
1771 let dr = &rec.dep_records[0];
1772 (dr.data_batch.iter().copied().collect(), dr.terminal)
1773 }
1774
1775 /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1776 /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1777 /// when a wave's inputs all suppress and the operator needs to settle
1778 /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1779 /// on full-reject).
1780 fn settle_dirty_resolved(&self, node_id: NodeId) {
1781 let mut s = self.lock_state();
1782 if s.require_node(node_id).terminal.is_some() {
1783 return;
1784 }
1785 let already_dirty = s.require_node(node_id).dirty;
1786 s.require_node_mut(node_id).dirty = true;
1787 if !already_dirty {
1788 self.queue_notify(&mut s, node_id, Message::Dirty);
1789 }
1790 // Slice G: skip Resolved if pending_notify already has a tier-3
1791 // message — adding Resolved would violate R1.3.3.a.
1792 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
1793 // on per-thread WaveState; borrow scoped so queue_notify can
1794 // re-borrow.
1795 let already_tier3 = with_wave_state(|ws| {
1796 ws.pending_notify
1797 .get(&node_id)
1798 .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1799 });
1800 if !already_tier3 {
1801 self.queue_notify(&mut s, node_id, Message::Resolved);
1802 }
1803 }
1804
1805 /// `OperatorOp::Map` dispatch.
1806 fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1807 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1808 // Mark fired regardless of input count (activation gate already
1809 // satisfied or partial-mode).
1810 {
1811 let mut s = self.lock_state();
1812 s.require_node_mut(node_id).has_fired_once = true;
1813 }
1814 if inputs.is_empty() {
1815 return;
1816 }
1817 // Phase 2 (lock-released): bulk project. Binding returns one
1818 // handle per input, each with a retain share already taken.
1819 let outputs = self.binding.project_each(fn_id, &inputs);
1820 // Phase 3: emit each output. `commit_emission_verbatim` consumes
1821 // the retain into the cache slot (and releases the prior cache
1822 // handle internally).
1823 for h in outputs {
1824 self.commit_emission_verbatim(node_id, h);
1825 }
1826 }
1827
1828 /// `OperatorOp::Filter` dispatch (D012/D018).
1829 fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1830 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1831 {
1832 let mut s = self.lock_state();
1833 s.require_node_mut(node_id).has_fired_once = true;
1834 }
1835 if inputs.is_empty() {
1836 return;
1837 }
1838 // Phase 2: predicate per input.
1839 let pass = self.binding.predicate_each(fn_id, &inputs);
1840 debug_assert!(
1841 pass.len() == inputs.len(),
1842 "predicate_each returned {} bools for {} inputs",
1843 pass.len(),
1844 inputs.len()
1845 );
1846 // Phase 3: emit passing items verbatim. Take a fresh retain for
1847 // each — the data_batch slot still owns its retain (released at
1848 // wave-end rotation), and the cache slot needs its own.
1849 let mut emitted = 0usize;
1850 for (i, &h) in inputs.iter().enumerate() {
1851 if pass.get(i).copied().unwrap_or(false) {
1852 self.binding.retain_handle(h);
1853 self.commit_emission_verbatim(node_id, h);
1854 emitted += 1;
1855 }
1856 }
1857 // D018: full-reject settles with DIRTY+RESOLVED.
1858 if emitted == 0 {
1859 self.settle_dirty_resolved(node_id);
1860 }
1861 }
1862
1863 /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1864 fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1865 use crate::op_state::ScanState;
1866 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1867 let acc = {
1868 let s = self.lock_state();
1869 scratch_ref::<ScanState>(&s, node_id).acc
1870 };
1871 {
1872 let mut s = self.lock_state();
1873 s.require_node_mut(node_id).has_fired_once = true;
1874 }
1875 if inputs.is_empty() {
1876 return;
1877 }
1878 // Phase 2: fold each input through. Returns N new handles, each
1879 // with a fresh retain.
1880 let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1881 debug_assert!(
1882 new_states.len() == inputs.len(),
1883 "fold_each returned {} accs for {} inputs",
1884 new_states.len(),
1885 inputs.len()
1886 );
1887 // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1888 // extra retain for the slot; release the prior acc's slot retain.
1889 let last_acc = new_states.last().copied();
1890 if let Some(last) = last_acc {
1891 let prev_acc = {
1892 let mut s = self.lock_state();
1893 let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1894 let prev = scratch.acc;
1895 scratch.acc = last;
1896 prev
1897 };
1898 // Take the slot's retain on the new acc.
1899 self.binding.retain_handle(last);
1900 // Release the prior slot's retain (post-lock to keep binding
1901 // free to re-enter Core safely).
1902 if prev_acc != crate::handle::NO_HANDLE {
1903 self.binding.release_handle(prev_acc);
1904 }
1905 }
1906 // Phase 3b: emit each intermediate acc verbatim. `new_states`
1907 // entries each carry one retain from `fold_each`; that retain is
1908 // consumed by `commit_emission_verbatim` into the cache slot.
1909 for h in new_states {
1910 self.commit_emission_verbatim(node_id, h);
1911 }
1912 }
1913
1914 /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1915 /// upstream COMPLETE (cascades ERROR verbatim).
1916 fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1917 use crate::op_state::ReduceState;
1918 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1919 let acc = {
1920 let s = self.lock_state();
1921 scratch_ref::<ReduceState>(&s, node_id).acc
1922 };
1923 {
1924 let mut s = self.lock_state();
1925 s.require_node_mut(node_id).has_fired_once = true;
1926 }
1927 // Phase 2: accumulate (silent — no per-input emit).
1928 let new_states = if inputs.is_empty() {
1929 SmallVec::<[HandleId; 1]>::new()
1930 } else {
1931 self.binding.fold_each(fn_id, acc, &inputs)
1932 };
1933 debug_assert!(
1934 new_states.len() == inputs.len(),
1935 "fold_each returned {} accs for {} inputs",
1936 new_states.len(),
1937 inputs.len()
1938 );
1939 // Update ReduceState.acc to last new acc; release intermediate
1940 // states (we don't emit them) and the prior acc's slot retain.
1941 let last_acc = new_states.last().copied();
1942 let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1943 new_states[..new_states.len() - 1].to_vec()
1944 } else {
1945 Vec::new()
1946 };
1947 let prev_acc_to_release = if let Some(last) = last_acc {
1948 let prev_acc = {
1949 let mut s = self.lock_state();
1950 let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1951 let prev = scratch.acc;
1952 scratch.acc = last;
1953 prev
1954 };
1955 self.binding.retain_handle(last);
1956 if prev_acc == crate::handle::NO_HANDLE {
1957 None
1958 } else {
1959 Some(prev_acc)
1960 }
1961 } else {
1962 None
1963 };
1964 // Release intermediate fold results (Reduce only emits the LAST,
1965 // but only on terminal). Each was retained by `fold_each`.
1966 for h in intermediates_to_release {
1967 self.binding.release_handle(h);
1968 }
1969 if let Some(h) = prev_acc_to_release {
1970 self.binding.release_handle(h);
1971 }
1972
1973 // Phase 3: emit on terminal.
1974 match terminal {
1975 None => {
1976 // Still accumulating; no emit. Subscribers see no message
1977 // for this wave (silent accumulation). The first wave that
1978 // pushes Reduce to fire produces a Dirty entry on the
1979 // upstream's commit, but Reduce itself doesn't queue any
1980 // tier-3 since R5 silently absorbs. v1: leave the
1981 // post-drain auto-resolve sweep to settle nothing —
1982 // pending_notify has no entry for Reduce so the sweep is
1983 // a no-op.
1984 }
1985 Some(TerminalKind::Complete) => {
1986 // Read the live acc (may be the seed if no DATA arrived)
1987 // and emit Data(acc) + Complete.
1988 let final_acc = {
1989 let s = self.lock_state();
1990 scratch_ref::<ReduceState>(&s, node_id).acc
1991 };
1992 if final_acc != crate::handle::NO_HANDLE {
1993 // Emission needs its own retain (slot's retain is
1994 // owned by ReduceState.acc until reset/Drop).
1995 self.binding.retain_handle(final_acc);
1996 self.commit_emission_verbatim(node_id, final_acc);
1997 }
1998 self.complete(node_id);
1999 }
2000 Some(TerminalKind::Error(h)) => {
2001 // Core::error transfers the caller's share into the
2002 // cascade (node.terminal + per-child dep_terminal slots);
2003 // no release at the error() boundary. Take a fresh share
2004 // here so the cascade owns it independently of the
2005 // dep_records[0].terminal slot's share.
2006 self.binding.retain_handle(h);
2007 self.error(node_id, h);
2008 }
2009 }
2010 }
2011
2012 /// `OperatorOp::DistinctUntilChanged` dispatch.
2013 fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
2014 use crate::op_state::DistinctState;
2015 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2016 let mut prev = {
2017 let s = self.lock_state();
2018 scratch_ref::<DistinctState>(&s, node_id).prev
2019 };
2020 {
2021 let mut s = self.lock_state();
2022 s.require_node_mut(node_id).has_fired_once = true;
2023 }
2024 if inputs.is_empty() {
2025 return;
2026 }
2027 // Take a working-copy retain on the initial prev so both the loop
2028 // (which releases old_prev on each non-equal item) and phase 3
2029 // (which releases the slot's original handle) each have their own
2030 // share. Without this, the loop's release of old_prev (== original
2031 // DistinctState.prev) double-releases against phase 3's stale_slot
2032 // release.
2033 if prev != crate::handle::NO_HANDLE {
2034 self.binding.retain_handle(prev);
2035 }
2036 // Phase 2: per-input equals(prev, current). Each non-equal input
2037 // is emitted and becomes the new prev. Equals fn_id reuses
2038 // `BindingBoundary::custom_equals`.
2039 let mut emitted = 0usize;
2040 for &h in &inputs {
2041 let equal = if prev == crate::handle::NO_HANDLE {
2042 false
2043 } else if prev == h {
2044 true
2045 } else {
2046 self.binding.custom_equals(equals_fn_id, prev, h)
2047 };
2048 if !equal {
2049 // Emit this input verbatim.
2050 self.binding.retain_handle(h);
2051 self.commit_emission_verbatim(node_id, h);
2052 // Update prev: take retain on new prev, release old
2053 // (working-copy retain from above or from prior iteration).
2054 self.binding.retain_handle(h);
2055 let old_prev = prev;
2056 prev = h;
2057 if old_prev != crate::handle::NO_HANDLE {
2058 self.binding.release_handle(old_prev);
2059 }
2060 emitted += 1;
2061 }
2062 }
2063 // Phase 3: persist prev into DistinctState.prev slot. Release the
2064 // slot's original retain (stale_slot) — this is the slot-owned
2065 // share, independent of the working-copy share released in the
2066 // loop above.
2067 {
2068 let mut s = self.lock_state();
2069 let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2070 let stale_slot = scratch.prev;
2071 scratch.prev = prev;
2072 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2073 drop(s);
2074 self.binding.release_handle(stale_slot);
2075 }
2076 }
2077 // Release the working-copy retain on the final prev if it was
2078 // never released in the loop (i.e. no non-equal items passed,
2079 // prev == original). In that case stale_slot == prev, so phase 3
2080 // didn't release it either — but the working-copy retain is still
2081 // outstanding. Release it now.
2082 if emitted == 0 && prev != crate::handle::NO_HANDLE {
2083 self.binding.release_handle(prev);
2084 }
2085 if emitted == 0 {
2086 self.settle_dirty_resolved(node_id);
2087 }
2088 }
2089
2090 /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2091 /// starting after the second value (first input swallowed, sets `prev`).
2092 fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2093 use crate::op_state::PairwiseState;
2094 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2095 let mut prev = {
2096 let s = self.lock_state();
2097 scratch_ref::<PairwiseState>(&s, node_id).prev
2098 };
2099 {
2100 let mut s = self.lock_state();
2101 s.require_node_mut(node_id).has_fired_once = true;
2102 }
2103 if inputs.is_empty() {
2104 return;
2105 }
2106 let mut emitted = 0usize;
2107 for &h in &inputs {
2108 if prev == crate::handle::NO_HANDLE {
2109 // First-ever value — swallow, set prev. Retain for the
2110 // PairwiseState.prev slot (persisted in phase 3 below).
2111 self.binding.retain_handle(h);
2112 prev = h;
2113 continue;
2114 }
2115 // Pack (prev, current) into a tuple handle. Binding returns a
2116 // fresh retain on the packed handle.
2117 let packed = self.binding.pairwise_pack(fn_id, prev, h);
2118 self.commit_emission_verbatim(node_id, packed);
2119 // Advance prev: take retain on h, release old prev.
2120 self.binding.retain_handle(h);
2121 let old_prev = prev;
2122 prev = h;
2123 self.binding.release_handle(old_prev);
2124 emitted += 1;
2125 }
2126 // Persist prev into PairwiseState.prev slot.
2127 {
2128 let mut s = self.lock_state();
2129 let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2130 let stale_slot = scratch.prev;
2131 scratch.prev = prev;
2132 if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2133 drop(s);
2134 self.binding.release_handle(stale_slot);
2135 }
2136 }
2137 if emitted == 0 {
2138 self.settle_dirty_resolved(node_id);
2139 }
2140 }
2141
2142 // =================================================================
2143 // Slice C-2: multi-dep combinator operators (D020)
2144 // =================================================================
2145
2146 /// Snapshot all deps' "latest" handle for multi-dep combinators.
2147 /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2148 /// this wave), else `prev_data` (last handle from previous wave).
2149 /// Also returns whether dep[0] (primary) had DATA this wave —
2150 /// needed by `fire_op_with_latest_from`.
2151 fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2152 let s = self.lock_state();
2153 let rec = s.require_node(node_id);
2154 let primary_fired = rec
2155 .dep_records
2156 .first()
2157 .is_some_and(|dr| !dr.data_batch.is_empty());
2158 let latest: SmallVec<[HandleId; 4]> = rec
2159 .dep_records
2160 .iter()
2161 .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2162 .collect();
2163 (latest, primary_fired)
2164 }
2165
2166 /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2167 /// latest handle per dep into a tuple via `pack_tuple`, emits on
2168 /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2169 /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2170 /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2171 /// instead of packing a NO_HANDLE into the tuple.
2172 fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2173 let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2174 {
2175 let mut s = self.lock_state();
2176 s.require_node_mut(node_id).has_fired_once = true;
2177 }
2178 // Post-warmup INVALIDATE guard: a dep may have been invalidated
2179 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2180 if latest.contains(&crate::handle::NO_HANDLE) {
2181 self.settle_dirty_resolved(node_id);
2182 return;
2183 }
2184 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2185 self.commit_emission_verbatim(node_id, tuple_handle);
2186 }
2187
2188 /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2189 /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2190 /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2191 /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2192 /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2193 fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2194 let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2195 let first_fire = {
2196 let mut s = self.lock_state();
2197 let rec = s.require_node_mut(node_id);
2198 let was_first = !rec.has_fired_once;
2199 rec.has_fired_once = true;
2200 was_first
2201 };
2202 // On first fire (gate release), always emit — the first-run gate
2203 // guarantees both deps have values (via prev_data fallback in
2204 // snapshot). On subsequent fires, only emit when primary fires.
2205 if !first_fire && !primary_fired {
2206 // Secondary-only update — no downstream DATA.
2207 self.settle_dirty_resolved(node_id);
2208 return;
2209 }
2210 // Post-warmup INVALIDATE guard: secondary may have been invalidated
2211 // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2212 debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2213 if latest[1] == crate::handle::NO_HANDLE {
2214 self.settle_dirty_resolved(node_id);
2215 return;
2216 }
2217 let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2218 self.commit_emission_verbatim(node_id, tuple_handle);
2219 }
2220
2221 /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2222 /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2223 /// batch handles are collected, retained, and emitted individually.
2224 fn fire_op_merge(&self, node_id: NodeId) {
2225 // Collect all batch handles from all deps (flat).
2226 let all_handles: Vec<HandleId> = {
2227 let s = self.lock_state();
2228 let rec = s.require_node(node_id);
2229 rec.dep_records
2230 .iter()
2231 .flat_map(|dr| dr.data_batch.iter().copied())
2232 .collect()
2233 };
2234 {
2235 let mut s = self.lock_state();
2236 s.require_node_mut(node_id).has_fired_once = true;
2237 }
2238 if all_handles.is_empty() {
2239 // All deps settled RESOLVED this wave — no DATA to forward.
2240 self.settle_dirty_resolved(node_id);
2241 return;
2242 }
2243 // Emit each handle verbatim. Take a fresh retain per handle
2244 // (independent of the dep batch's retain which gets released at
2245 // wave-end). Matches Filter's discipline for passing inputs.
2246 for &h in &all_handles {
2247 self.binding.retain_handle(h);
2248 self.commit_emission_verbatim(node_id, h);
2249 }
2250 }
2251
2252 // =================================================================
2253 // Slice C-3: flow operators (D024)
2254 // =================================================================
2255
2256 /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2257 /// then self-completes via `Core::complete`. When `count == 0`, the
2258 /// first fire emits zero items then immediately self-completes
2259 /// (D027). Cross-wave counter lives in
2260 /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2261 fn fire_op_take(&self, node_id: NodeId, count: u32) {
2262 use crate::op_state::TakeState;
2263 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2264 // Snapshot current counter; mark fired regardless of input count
2265 // (activation gate already satisfied or partial-mode).
2266 let mut count_emitted = {
2267 let s = self.lock_state();
2268 scratch_ref::<TakeState>(&s, node_id).count_emitted
2269 };
2270 {
2271 let mut s = self.lock_state();
2272 s.require_node_mut(node_id).has_fired_once = true;
2273 }
2274 // Already at quota before any input this wave — self-complete
2275 // immediately. Covers `count == 0` (first-fire short-circuit) and
2276 // any defensive re-entry after the terminal-skip in `fire_operator`
2277 // already guards against double-complete.
2278 if count_emitted >= count {
2279 self.complete(node_id);
2280 return;
2281 }
2282 // Per-input emission loop. Each pass takes a fresh retain for the
2283 // cache slot; data_batch slot's retain is released at wave-end
2284 // rotation independently.
2285 for &h in &inputs {
2286 self.binding.retain_handle(h);
2287 self.commit_emission_verbatim(node_id, h);
2288 count_emitted = count_emitted.saturating_add(1);
2289 if count_emitted >= count {
2290 break;
2291 }
2292 }
2293 // Persist the updated counter.
2294 {
2295 let mut s = self.lock_state();
2296 scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2297 }
2298 // Self-complete if we hit the quota this wave. Upstream COMPLETE
2299 // (terminal == Some(Complete)) without us hitting the count
2300 // propagates via the standard auto-cascade — we don't intercept it.
2301 if count_emitted >= count {
2302 self.complete(node_id);
2303 return;
2304 }
2305 // If upstream is already Errored and we haven't hit count, the
2306 // standard cascade will propagate it. If the wave delivered no
2307 // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2308 // subscribers see the wave close.
2309 if inputs.is_empty() && terminal.is_none() {
2310 self.settle_dirty_resolved(node_id);
2311 }
2312 }
2313
2314 /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2315 /// then forwards the rest. Cross-wave counter lives in
2316 /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2317 /// On a wave where every input is still in the skip window, settles
2318 /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2319 fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2320 use crate::op_state::SkipState;
2321 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2322 let mut count_skipped = {
2323 let s = self.lock_state();
2324 scratch_ref::<SkipState>(&s, node_id).count_skipped
2325 };
2326 {
2327 let mut s = self.lock_state();
2328 s.require_node_mut(node_id).has_fired_once = true;
2329 }
2330 // No early-return on empty inputs: the post-loop `emitted == 0`
2331 // settle handles the empty-inputs case identically to the
2332 // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2333 // with `fire_op_take`).
2334 let mut emitted = 0usize;
2335 for &h in &inputs {
2336 if count_skipped < count {
2337 count_skipped = count_skipped.saturating_add(1);
2338 // Drop this input — the data_batch slot still owns its
2339 // retain (released at wave-end rotation). No emission.
2340 continue;
2341 }
2342 // Past the skip window — emit verbatim. Take a fresh retain
2343 // for the cache slot.
2344 self.binding.retain_handle(h);
2345 self.commit_emission_verbatim(node_id, h);
2346 emitted += 1;
2347 }
2348 // Persist the updated counter.
2349 {
2350 let mut s = self.lock_state();
2351 scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2352 }
2353 if emitted == 0 {
2354 self.settle_dirty_resolved(node_id);
2355 }
2356 }
2357
2358 /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2359 /// holds; on the first `false`, emits any preceding passes from the
2360 /// same batch then self-completes via `Core::complete`. Reuses
2361 /// [`BindingBoundary::predicate_each`] (D029).
2362 fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2363 let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2364 {
2365 let mut s = self.lock_state();
2366 s.require_node_mut(node_id).has_fired_once = true;
2367 }
2368 if inputs.is_empty() {
2369 return;
2370 }
2371 // Phase 2: predicate per input.
2372 let pass = self.binding.predicate_each(fn_id, &inputs);
2373 debug_assert!(
2374 pass.len() == inputs.len(),
2375 "predicate_each returned {} bools for {} inputs",
2376 pass.len(),
2377 inputs.len()
2378 );
2379 // Phase 3: emit each input until the first false; then
2380 // self-complete. `fire_operator`'s `terminal.is_some()`
2381 // short-circuit gates re-entry after the self-complete cascade
2382 // installs the terminal slot — no extra `done` flag needed.
2383 let mut emitted = 0usize;
2384 let mut first_false_seen = false;
2385 for (i, &h) in inputs.iter().enumerate() {
2386 if pass.get(i).copied().unwrap_or(false) {
2387 self.binding.retain_handle(h);
2388 self.commit_emission_verbatim(node_id, h);
2389 emitted += 1;
2390 } else {
2391 first_false_seen = true;
2392 break;
2393 }
2394 }
2395 if first_false_seen {
2396 self.complete(node_id);
2397 return;
2398 }
2399 if emitted == 0 {
2400 // Whole batch passed but was empty (impossible here since
2401 // inputs.is_empty() returned early above) — defensive only.
2402 self.settle_dirty_resolved(node_id);
2403 }
2404 }
2405
2406 /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2407 /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2408 /// default was registered) then `Complete` on upstream COMPLETE.
2409 /// On upstream ERROR, propagates verbatim. Storage:
2410 /// [`LastState`](super::op_state::LastState).
2411 ///
2412 /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2413 /// wave (`terminal == None`), `fire_op_last` updates the buffered
2414 /// `latest` handle but produces NO downstream wire message —
2415 /// subscribers observe the operator only when upstream
2416 /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2417 /// inputs from the dep's batch are dropped on the floor (their
2418 /// `data_batch` retains release at wave-end rotation
2419 /// independently). Per-wave settlement on intermediate waves is
2420 /// the canonical behavior for terminal-aware operators.
2421 fn fire_op_last(&self, node_id: NodeId) {
2422 use crate::op_state::LastState;
2423 let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2424 {
2425 let mut s = self.lock_state();
2426 s.require_node_mut(node_id).has_fired_once = true;
2427 }
2428
2429 // Phase 2: buffer the latest input handle (if any). Retain new,
2430 // release old. data_batch slot's retain is released at wave-end
2431 // rotation independently — the LastState slot keeps its own
2432 // share so the value survives across waves.
2433 if let Some(&new_latest) = inputs.last() {
2434 let prev_latest = {
2435 let mut s = self.lock_state();
2436 let scratch = scratch_mut::<LastState>(&mut s, node_id);
2437 let prev = scratch.latest;
2438 scratch.latest = new_latest;
2439 prev
2440 };
2441 self.binding.retain_handle(new_latest);
2442 if prev_latest != crate::handle::NO_HANDLE {
2443 self.binding.release_handle(prev_latest);
2444 }
2445 }
2446
2447 // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2448 // produce no downstream message — Reduce-style silent
2449 // accumulation. The post-drain auto-resolve sweep is a no-op
2450 // because pending_notify has no entry for Last.
2451 match terminal {
2452 None => {}
2453 Some(TerminalKind::Complete) => {
2454 // Read the live latest + default. If latest != NO_HANDLE,
2455 // emit it. Otherwise, if default != NO_HANDLE, emit default.
2456 // Otherwise, emit only Complete (empty stream, no default).
2457 let (latest, default) = {
2458 let s = self.lock_state();
2459 let scratch = scratch_ref::<LastState>(&s, node_id);
2460 (scratch.latest, scratch.default)
2461 };
2462 let to_emit = if latest != crate::handle::NO_HANDLE {
2463 Some(latest)
2464 } else if default != crate::handle::NO_HANDLE {
2465 Some(default)
2466 } else {
2467 None
2468 };
2469 if let Some(h) = to_emit {
2470 // Emission needs its own retain — the LastState slot
2471 // keeps its share until reset/Drop.
2472 self.binding.retain_handle(h);
2473 self.commit_emission_verbatim(node_id, h);
2474 }
2475 self.complete(node_id);
2476 }
2477 Some(TerminalKind::Error(h)) => {
2478 // Take a fresh share for the error cascade — the
2479 // dep_records[0].terminal slot keeps its own share
2480 // (released by reset_for_fresh_lifecycle / Drop).
2481 self.binding.retain_handle(h);
2482 self.error(node_id, h);
2483 }
2484 }
2485 }
2486
2487 pub(crate) fn deliver_data_to_consumer(
2488 &self,
2489 s: &mut CoreState,
2490 consumer_id: NodeId,
2491 dep_idx: usize,
2492 handle: HandleId,
2493 ) {
2494 // Retain the handle for the batch accumulation slot — each DATA
2495 // handle in `data_batch` owns a retain share, released at wave-end
2496 // rotation in `clear_wave_state`.
2497 self.binding.retain_handle(handle);
2498
2499 let is_dynamic;
2500 let is_state;
2501 let tracked_or_first_fire;
2502 // Slice F audit close (2026-05-07): default-mode pause suppression.
2503 // If the consumer is paused with `PausableMode::Default`, the
2504 // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2505 // pause-window dep deliveries into one fn execution on RESUME.
2506 // Mark `pending_wave` on the pause state instead of adding to
2507 // `pending_fires`. The dep state still advances (the data_batch push
2508 // above is unchanged), and clear_wave_state still rotates the latest
2509 // dep DATA into prev_data — so when the fn ultimately fires on
2510 // RESUME, it sees the consolidated post-pause state.
2511 let suppressed_for_default_pause;
2512 {
2513 let consumer = s.require_node_mut(consumer_id);
2514 consumer.dep_records[dep_idx].data_batch.push(handle);
2515 consumer.dep_records[dep_idx].involved_this_wave = true;
2516 consumer.involved_this_wave = true;
2517 is_dynamic = consumer.is_dynamic;
2518 is_state = consumer.is_state();
2519 tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
2520 suppressed_for_default_pause = consumer.pause_state.is_paused()
2521 && consumer.pausable == crate::node::PausableMode::Default;
2522 if suppressed_for_default_pause {
2523 consumer.pause_state.mark_pending_wave();
2524 }
2525 }
2526 if suppressed_for_default_pause {
2527 // Default-mode pause: don't add to pending_fires; RESUME will
2528 // schedule one consolidated fire.
2529 return;
2530 }
2531 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
2532 // per-thread WaveState. State lock + WaveState borrow are
2533 // independent.
2534 if is_state {
2535 // State nodes don't have deps; unreachable in practice.
2536 } else if is_dynamic {
2537 if tracked_or_first_fire {
2538 with_wave_state(|ws| {
2539 ws.pending_fires.insert(consumer_id);
2540 });
2541 }
2542 } else {
2543 // Derived / Operator / Producer (Producer has no deps so won't
2544 // reach here, but the predicate-based dispatch handles it
2545 // uniformly).
2546 with_wave_state(|ws| {
2547 ws.pending_fires.insert(consumer_id);
2548 });
2549 }
2550 }
2551
2552 // -------------------------------------------------------------------
2553 // Subscriber notification
2554 // -------------------------------------------------------------------
2555
2556 /// Queue a wave-end message for `node_id`'s subscribers.
2557 ///
2558 /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
2559 /// 2026-05-08):** each push for a given node either appends the
2560 /// message to the open batch (if `NodeRecord::subscribers_revision`
2561 /// hasn't advanced since that batch opened — the common case — no
2562 /// extra allocation), or opens a fresh batch with a current sink
2563 /// snapshot frozen at the new revision. A sub installed mid-wave
2564 /// bumps `subscribers_revision`; the next `queue_notify` for the
2565 /// same node observes the bump and starts a new batch that includes
2566 /// the new sub. Pre-subscribe batches retain their original snapshot,
2567 /// so earlier emits flush to their original sink list — the new sub
2568 /// does NOT double-receive them via flush AND handshake replay,
2569 /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
2570 ///
2571 /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
2572 /// Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
2573 /// paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
2574 /// COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
2575 /// flush immediately. START (tier 0) is per-subscription and never
2576 /// transits queue_notify.
2577 pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
2578 // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
2579 // wave-content invariant assertion. The tier-3 slot at one node in
2580 // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
2581 // never multiple RESOLVED. Slice G moved equals substitution from
2582 // per-emit to wave-end coalescing; this assert pins that the
2583 // dispatcher itself never queues a violating combination at the
2584 // queue_notify granularity. Resolved arrivals come from:
2585 // 1. The auto-resolve sweep in `drain_and_flush` (gates on
2586 // `!any tier-3` so it can't add to a wave with Data).
2587 // 2. The wave-end equals-substitution pass (rewrites in place,
2588 // doesn't go through queue_notify).
2589 // Both honor R1.3.3.a by construction post-Slice-G.
2590 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2591 // on per-thread WaveState. The dev-mode invariant assertion
2592 // borrows WaveState briefly and drops before the rest of
2593 // queue_notify proceeds.
2594 #[cfg(debug_assertions)]
2595 if matches!(msg.tier(), 3) {
2596 with_wave_state(|ws| {
2597 if let Some(entry) = ws.pending_notify.get(&node_id) {
2598 // Walk all batches' messages — R1.3.3.a is a per-node
2599 // wave-content invariant, not per-batch (the X4 batches
2600 // are subscriber-snapshot epochs; the protocol-level
2601 // tier-3 invariant spans the whole wave for the node).
2602 let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
2603 let resolved_count = entry
2604 .iter_messages()
2605 .filter(|m| matches!(m, Message::Resolved))
2606 .count();
2607 let incoming_is_data = matches!(msg, Message::Data(_));
2608 if incoming_is_data {
2609 debug_assert!(
2610 resolved_count == 0,
2611 "R1.3.3.a violation at {node_id:?}: queueing Data into a \
2612 wave that already contains Resolved — Slice G should have \
2613 prevented this via wave-end coalescing"
2614 );
2615 } else {
2616 debug_assert!(
2617 !has_data,
2618 "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
2619 wave that already contains Data"
2620 );
2621 debug_assert!(
2622 resolved_count == 0,
2623 "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
2624 wave at one node"
2625 );
2626 }
2627 }
2628 });
2629 }
2630
2631 let buffered_tier = matches!(msg.tier(), 3 | 4);
2632 let cap = s.pause_buffer_cap;
2633
2634 // Pause-routing branch — handles its own retain/release and returns
2635 // before we touch `pending_notify`, so the rec borrow is contained.
2636 {
2637 let rec = s.require_node_mut(node_id);
2638 if rec.subscribers.is_empty() {
2639 return;
2640 }
2641 // Slice F audit close (2026-05-07): pause routing depends on mode.
2642 // - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
2643 // - `Default` + STATE node: state nodes have no fn-fire to
2644 // suppress, so buffer like resumeAll (collapse-to-latest is
2645 // a future enhancement; v1 keeps verbatim).
2646 // - `Default` + COMPUTE node: suppression happens upstream at
2647 // fn-fire scheduling (see `deliver_data_to_consumer`); no
2648 // outgoing tier-3 is produced from this node while paused,
2649 // so this branch is unreachable for compute-default-paused.
2650 // Fallthrough to the non-paused queue path is fine.
2651 // - `Off`: pause is ignored entirely — tier-3 flushes
2652 // immediately. Fallthrough.
2653 let mode_buffers_tier3 = match rec.pausable {
2654 crate::node::PausableMode::ResumeAll => true,
2655 crate::node::PausableMode::Default => rec.is_state(),
2656 crate::node::PausableMode::Off => false,
2657 };
2658 if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
2659 if let Some(h) = msg.payload_handle() {
2660 self.binding.retain_handle(h);
2661 }
2662 let push_result = rec.pause_state.push_buffered(msg, cap);
2663 for dm in push_result.dropped_msgs {
2664 if let Some(h) = dm.payload_handle() {
2665 self.binding.release_handle(h);
2666 }
2667 }
2668 // R1.3.8.c (Slice F, A3): on first overflow this cycle,
2669 // schedule a synthesized ERROR for wave-end emission.
2670 // `cap` is `Some` here (an overflow can only happen with a
2671 // configured cap), so `unwrap` is safe.
2672 if push_result.first_overflow_this_cycle {
2673 if let Some((dropped_count, lock_held_ns)) =
2674 rec.pause_state.overflow_diagnostic()
2675 {
2676 // Q-beyond Sub-slice 1 (D108, 2026-05-09):
2677 // pending_pause_overflow lives on per-thread WaveState.
2678 with_wave_state(|ws| {
2679 ws.pending_pause_overflow
2680 .push(crate::node::PendingPauseOverflow {
2681 node_id,
2682 dropped_count,
2683 configured_max: cap.unwrap_or(0),
2684 lock_held_ns,
2685 });
2686 });
2687 }
2688 }
2689 return;
2690 }
2691 }
2692
2693 // Non-paused queue path: retain payload handle and queue into
2694 // pending_notify. Released in `flush_notifications` after sinks
2695 // fire.
2696 if let Some(h) = msg.payload_handle() {
2697 self.binding.retain_handle(h);
2698 }
2699 Self::push_into_pending_notify(s, node_id, msg);
2700 }
2701
2702 /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
2703 /// non-paused path. Either appends `msg` to the open batch (if
2704 /// `subscribers_revision` hasn't advanced since it opened — common
2705 /// case, no extra allocation) or opens a fresh batch with a current
2706 /// sink snapshot frozen at the new revision.
2707 ///
2708 /// Borrow discipline: reads `subscribers_revision` and the snapshot
2709 /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
2710 /// keep the two scopes disjoint.
2711 ///
2712 /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
2713 /// to per-thread WaveState. The state-side read of
2714 /// `subscribers_revision` / `subscribers` happens before the
2715 /// `with_wave_state` block opens, then the WaveState borrow
2716 /// performs the entry insertion / append. State lock + WaveState
2717 /// borrow remain independent.
2718 ///
2719 /// Lock-discipline assumption: this read of `subscribers_revision`
2720 /// is safe because both the subscribe install path
2721 /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2722 /// `CoreState`'s mutex when they bump / read the revision —
2723 /// concurrent subscribe/unsubscribe cannot interleave. **If
2724 /// `Core::subscribe` ever moves the sink-install lock-released
2725 /// (mirroring the lock-released drain refactor), the revision read
2726 /// here must re-validate post-borrow — otherwise a fresh batch
2727 /// could open with a stale snapshot.**
2728 fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2729 let current_rev = s.require_node(node_id).subscribers_revision;
2730 let needs_new_batch = with_wave_state(|ws| {
2731 ws.pending_notify.get(&node_id).is_none_or(|entry| {
2732 entry
2733 .batches
2734 .last()
2735 .is_none_or(|b| b.snapshot_revision != current_rev)
2736 })
2737 });
2738 let sinks_snapshot: Vec<Sink> = if needs_new_batch {
2739 s.require_node(node_id)
2740 .subscribers
2741 .values()
2742 .cloned()
2743 .collect()
2744 } else {
2745 Vec::new()
2746 };
2747 with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
2748 Entry::Vacant(slot) => {
2749 let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2750 batches.push(PendingBatch {
2751 snapshot_revision: current_rev,
2752 sinks: sinks_snapshot,
2753 messages: vec![msg],
2754 });
2755 slot.insert(PendingPerNode { batches });
2756 }
2757 Entry::Occupied(mut slot) => {
2758 let entry = slot.get_mut();
2759 if needs_new_batch {
2760 entry.batches.push(PendingBatch {
2761 snapshot_revision: current_rev,
2762 sinks: sinks_snapshot,
2763 messages: vec![msg],
2764 });
2765 } else {
2766 entry
2767 .batches
2768 .last_mut()
2769 .expect("non-empty by construction (entry exists implies batch exists)")
2770 .messages
2771 .push(msg);
2772 }
2773 }
2774 });
2775 }
2776
2777 /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
2778 /// payload-handle releases owed for `pending_notify` into
2779 /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
2780 /// run **after** the state lock is dropped — see [`Core::run_wave`].
2781 ///
2782 /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2783 /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2784 /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2785 /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2786 /// queue lock-released, multi-node subscribers see all DIRTYs before any
2787 /// settle. Matches TS's drainPhase model without the per-tier queue
2788 /// indirection.
2789 ///
2790 /// Phase ordering:
2791 /// 1 → tier 1 (DIRTY)
2792 /// 2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2793 /// 3 → tier 5 (COMPLETE/ERROR)
2794 /// 4 → tier 6 (TEARDOWN)
2795 ///
2796 /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2797 /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2798 /// bypassing pending_notify; both are absent from this enumeration.
2799 ///
2800 /// Within a single phase, per-node insertion order (IndexMap iteration)
2801 /// is preserved — an emit on A before B → A's phase-2 messages flush
2802 /// before B's. Within a single node, message order is preserved.
2803 fn flush_notifications(&self, s: &mut CoreState) {
2804 const PHASES: &[&[u8]] = &[
2805 &[1], // DIRTY
2806 &[3, 4], // DATA/RESOLVED + INVALIDATE
2807 &[5], // COMPLETE/ERROR
2808 &[6], // TEARDOWN
2809 ];
2810 // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
2811 // deferred_handle_releases, and deferred_flush_jobs all live on
2812 // per-thread WaveState. Take pending_notify under the WaveState
2813 // borrow, drop the borrow, run the per-phase loop (no WaveState
2814 // access in the loop body), then re-borrow WaveState at the end
2815 // to push the collected jobs and payload-handle releases.
2816 //
2817 // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
2818 // currently unused inside the per-phase loop — `pending` was
2819 // moved off `s` to WaveState by sub-slice 2, and the per-batch
2820 // sink snapshot is already on the PendingBatch. Kept as a
2821 // parameter to preserve the caller's `let mut s = lock_state();
2822 // self.flush_notifications(&mut s);` invocation shape (caller
2823 // holds the state lock around this call — load-bearing for
2824 // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
2825 // released" marker; the lock guard belongs to the caller and
2826 // is held throughout this function. A future change that adds
2827 // an in-loop state read should remove the discard below;
2828 // removing the parameter would break the caller's ability to
2829 // express the lock-discipline contract at the call site.
2830 let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
2831 let pending = with_wave_state(|ws| std::mem::take(&mut ws.pending_notify));
2832 let mut jobs: DeferredJobs = Vec::new();
2833 for &phase_tiers in PHASES {
2834 for (_node_id, entry) in &pending {
2835 // Slice X4 / D2: iterate batches in arrival order. Each
2836 // batch carries its own sink snapshot frozen at open-time;
2837 // a batch's messages flush to ITS sinks only. Within a
2838 // single (phase, node), batches stay in arrival order so
2839 // emit-order semantics are preserved across batches.
2840 for batch in &entry.batches {
2841 if batch.sinks.is_empty() {
2842 continue;
2843 }
2844 let phase_msgs: Vec<Message> = batch
2845 .messages
2846 .iter()
2847 .copied()
2848 .filter(|m| phase_tiers.contains(&m.tier()))
2849 .collect();
2850 if phase_msgs.is_empty() {
2851 continue;
2852 }
2853 let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
2854 jobs.push((sinks_clone, phase_msgs));
2855 }
2856 }
2857 }
2858 // Single WaveState borrow at the end: push the collected jobs
2859 // and the payload-handle releases. Refcount release balances the
2860 // retain done in `queue_notify` for every payload-bearing message
2861 // that landed in pending_notify (across ALL batches per node);
2862 // deferred to post-lock-drop so the binding's release path can't
2863 // re-enter Core under our lock.
2864 with_wave_state(|ws| {
2865 ws.deferred_flush_jobs.append(&mut jobs);
2866 for entry in pending.values() {
2867 for msg in entry.iter_messages() {
2868 if let Some(h) = msg.payload_handle() {
2869 ws.deferred_handle_releases.push(h);
2870 }
2871 }
2872 }
2873 });
2874 }
2875
2876 /// Take the deferred sink-fire jobs, payload-handle releases,
2877 /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
2878 /// Callers pair this with `drop(state_guard)` and a subsequent
2879 /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
2880 /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
2881 /// Q2(b) eager wipe_ctx fires lock-released.
2882 ///
2883 /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
2884 /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
2885 /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
2886 /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
2887 /// WaveState. The `_s: &mut CoreState` parameter is now unused but
2888 /// kept to preserve the call-site lock-discipline ordering (caller
2889 /// holds the state lock around this call to interleave with prior
2890 /// `clear_wave_state` per-NodeRecord work).
2891 pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
2892 (
2893 std::mem::take(&mut ws.deferred_flush_jobs),
2894 std::mem::take(&mut ws.deferred_handle_releases),
2895 std::mem::take(&mut ws.deferred_cleanup_hooks),
2896 std::mem::take(&mut ws.pending_wipes),
2897 )
2898 }
2899
2900 /// Fire deferred sink-fire jobs in collected order, then release the
2901 /// payload handles owed for messages that landed in `pending_notify`
2902 /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2903 /// hooks. All three phases run lock-released so:
2904 /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2905 /// state lock cleanly and run their own nested wave.
2906 /// - The binding's `release_handle` path can't deadlock against a
2907 /// binding-side mutex held by Core.
2908 /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2909 /// may safely re-enter Core for unrelated nodes.
2910 ///
2911 /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2912 /// is wrapped in `catch_unwind` so a single binding panic doesn't
2913 /// short-circuit the per-wave drain. All queued cleanup attempts run;
2914 /// if any panicked, the LAST panic re-raises after the loop completes
2915 /// (preserving wave-end discipline while still surfacing failures).
2916 /// Per D060, Core stays panic-naive about user code — bindings own
2917 /// their host-language panic policy inside `cleanup_for`; this
2918 /// `catch_unwind` is purely about drain-don't-short-circuit.
2919 pub(crate) fn fire_deferred(
2920 &self,
2921 jobs: DeferredJobs,
2922 releases: Vec<HandleId>,
2923 cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2924 pending_wipes: Vec<crate::handle::NodeId>,
2925 ) {
2926 // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2927 // `catch_unwind` so a panicking sink doesn't unwind out of
2928 // `fire_deferred` and drop the queued `releases` +
2929 // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2930 // handshake-fire discipline. Without this guard, a sink panic
2931 // here would silently leak handle retains AND silently drop
2932 // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2933 // we re-raise the last panic at the end after running every
2934 // queued fire — drain ordering is preserved.
2935 let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2936 for (sinks, msgs) in jobs {
2937 for sink in &sinks {
2938 let sink = sink.clone();
2939 let msgs_ref = &msgs;
2940 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2941 sink(msgs_ref);
2942 }));
2943 if let Err(payload) = result {
2944 last_panic = Some(payload);
2945 }
2946 }
2947 }
2948 for h in releases {
2949 self.binding.release_handle(h);
2950 }
2951 // Slice E2 (D060): drain cleanup hooks with per-item panic
2952 // isolation so the loop always completes. AssertUnwindSafe is
2953 // safe here because we don't rely on logical state being valid
2954 // post-panic — the panic propagates anyway after the drain ends.
2955 for (node_id, trigger) in cleanup_hooks {
2956 let binding = &self.binding;
2957 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2958 binding.cleanup_for(node_id, trigger);
2959 }));
2960 if let Err(payload) = result {
2961 last_panic = Some(payload);
2962 }
2963 }
2964 // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2965 // same per-item panic isolation. Fires AFTER cleanup hooks so a
2966 // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2967 // fires in the same wave) sees pre-wipe binding state if it
2968 // landed in the same wave as the terminal cascade. Mutually
2969 // exclusive with `Subscription::Drop`'s direct-fire site, but
2970 // even concurrent fires are idempotent (binding's `wipe_ctx`
2971 // calls `HashMap::remove` which is a no-op on absent keys).
2972 for node_id in pending_wipes {
2973 let binding = &self.binding;
2974 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2975 binding.wipe_ctx(node_id);
2976 }));
2977 if let Err(payload) = result {
2978 last_panic = Some(payload);
2979 }
2980 }
2981 if let Some(payload) = last_panic {
2982 std::panic::resume_unwind(payload);
2983 }
2984 }
2985
2986 // -------------------------------------------------------------------
2987 // User-facing batch — coalesce multiple emits into one wave
2988 // -------------------------------------------------------------------
2989
2990 /// Coalesce multiple emissions into a single wave. Every `emit` /
2991 /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2992 /// queues its downstream work; the wave drains when `f` returns.
2993 ///
2994 /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2995 /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2996 /// — repeated emits on the same node coalesce into a single multi-message
2997 /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2998 /// per emit, all delivered together in the per-node phase-2 pass).
2999 ///
3000 /// Nested `batch()` calls share the outer wave; only the outermost call
3001 /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3002 /// engine's own `s.in_tick` re-entrance) compose with this method
3003 /// transparently — they observe `in_tick = true` and skip drain just
3004 /// like nested `batch()`.
3005 ///
3006 /// On panic inside `f`, the `BatchGuard` returned by the internal
3007 /// `begin_batch` call drops normally and discards pending tier-3+ work
3008 /// (subscribers do not observe the half-built wave). See
3009 /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3010 /// over the scope boundary.
3011 pub fn batch<F>(&self, f: F)
3012 where
3013 F: FnOnce(),
3014 {
3015 let _guard = self.begin_batch();
3016 f();
3017 }
3018
3019 /// RAII batch handle — opens a wave when constructed, drains on drop.
3020 ///
3021 /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3022 /// boundary so callers can compose batches with non-`FnOnce` control
3023 /// flow (e.g. async-state-machine code paths, or splitting setup and
3024 /// drain across helper functions).
3025 ///
3026 /// ```ignore
3027 /// let g = core.begin_batch();
3028 /// core.emit(state_a, h1);
3029 /// core.emit(state_b, h2);
3030 /// drop(g); // wave drains here
3031 /// ```
3032 ///
3033 /// Like the closure form, nested `begin_batch` calls share the outer
3034 /// wave (only the outermost guard drains).
3035 ///
3036 /// # Panics
3037 ///
3038 /// Panics if the registry-epoch retry-validate loop exceeds
3039 /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations — pathological
3040 /// concurrent `register` / `set_deps` activity racing with
3041 /// closure-form batch entry. Unreachable in correct call paths.
3042 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3043 pub fn begin_batch(&self) -> BatchGuard {
3044 // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
3045 // seed; per session-doc Q7 / D092 it MUST serialize against every
3046 // currently-existing partition. Acquire each partition's
3047 // `wave_owner` in ascending [`SubgraphId`] order via the retry-
3048 // validate primitive. Same-thread re-entry passes through each
3049 // ReentrantMutex transparently; cross-thread waves on any of the
3050 // touched partitions block until our `wave_guards` drop.
3051 //
3052 // **QA-fix #2 (2026-05-09) — registry epoch retry-validate:** a
3053 // concurrent `register` / `set_deps`-driven union/split between
3054 // our `all_partitions_lock_boxes()` snapshot and the post-
3055 // acquire epoch read changes the partition set. We then retry
3056 // the whole acquire with the new snapshot. Without this, a
3057 // partition added after our snapshot would not be held by our
3058 // batch — breaking the closure-form's "all-partitions
3059 // serialization" contract.
3060 //
3061 // Trade-off (documented v1 contract): closure-form batch is the
3062 // serialization point under per-partition parallelism. Per-seed
3063 // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
3064 // acquire only the touched partitions and run truly parallel
3065 // for disjoint partitions.
3066 for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3067 let epoch_before = self.registry.lock().epoch();
3068 let partition_boxes = self.all_partitions_lock_boxes();
3069 let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3070 for (sid, _box) in &partition_boxes {
3071 // Use the partition's root NodeId as the lock_for retry
3072 // seed. SubgraphId.raw() == root NodeId.raw(); the root
3073 // is always registered in the X5 / Phase-E substrate
3074 // (cleanup_node is gated, Phase G activates).
3075 let representative = crate::handle::NodeId::new(sid.raw());
3076 wave_guards.push(self.partition_wave_owner_lock_arc(representative));
3077 }
3078 // Post-acquire epoch read. If unchanged, our snapshot is
3079 // still authoritative — every existing partition was held
3080 // throughout. If changed, drop guards and retry.
3081 let epoch_after = self.registry.lock().epoch();
3082 if epoch_after == epoch_before {
3083 return self.begin_batch_with_guards(wave_guards);
3084 }
3085 // Drop guards lock-released so retries don't accumulate.
3086 drop(wave_guards);
3087 std::thread::yield_now();
3088 }
3089 panic!(
3090 "Core::begin_batch: exceeded {} retries — pathological concurrent \
3091 register/union/split activity racing with closure-form batch entry",
3092 crate::subgraph::MAX_LOCK_RETRIES
3093 );
3094 }
3095
3096 /// Begin a batch scoped to the partitions transitively touched from
3097 /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
3098 /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
3099 /// reachable partition, and acquires each in ascending
3100 /// [`crate::subgraph::SubgraphId`] order via
3101 /// [`Core::partition_wave_owner_lock_arc`].
3102 ///
3103 /// Two threads with disjoint touched-partition sets run truly
3104 /// parallel — the per-partition `wave_owner` mutexes don't block
3105 /// each other. This is the canonical Y1 parallelism win for
3106 /// per-seed wave-driving entry points (subscribe, emit, pause,
3107 /// resume, invalidate, complete, error, teardown,
3108 /// set_deps push-on-subscribe).
3109 ///
3110 /// **QA-fix #2 (2026-05-09):** retry-validate the touched-partition
3111 /// set against the registry epoch — same protection as
3112 /// [`Self::begin_batch`] but scoped to a per-seed touched set
3113 /// rather than every partition. Conservative: any registry
3114 /// mutation (even on a partition unrelated to seed's touched set)
3115 /// triggers a retry. This avoids a precise "did MY touched set
3116 /// change?" check at the cost of occasional spurious retries.
3117 ///
3118 /// # Panics
3119 ///
3120 /// Panics if the registry-epoch retry-validate loop exceeds
3121 /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations, OR if
3122 /// [`Core::partition_wave_owner_lock_arc`] panics on an
3123 /// unregistered seed. Both are unreachable in correct call paths
3124 /// (P12 invariant guarantees registry membership matches
3125 /// `s.nodes`).
3126 ///
3127 /// Slice Y1 / Phase E (2026-05-08); QA-fix #2 (2026-05-09).
3128 #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3129 pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
3130 for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3131 let epoch_before = self.registry.lock().epoch();
3132 let touched = self.compute_touched_partitions(seed);
3133 let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3134 for sid in &touched {
3135 let representative = crate::handle::NodeId::new(sid.raw());
3136 wave_guards.push(self.partition_wave_owner_lock_arc(representative));
3137 }
3138 let epoch_after = self.registry.lock().epoch();
3139 if epoch_after == epoch_before {
3140 return self.begin_batch_with_guards(wave_guards);
3141 }
3142 drop(wave_guards);
3143 std::thread::yield_now();
3144 }
3145 panic!(
3146 "Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
3147 pathological concurrent register/union/split activity racing \
3148 with per-seed batch entry",
3149 crate::subgraph::MAX_LOCK_RETRIES
3150 );
3151 }
3152
3153 /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
3154 /// with the supplied (already-acquired) partition wave-owner guards.
3155 /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
3156 /// order (the canonical lock-acquisition order) — both
3157 /// [`Self::begin_batch`] (all-partitions) and
3158 /// [`Self::begin_batch_for`] (touched-partitions) construct the
3159 /// vector in that order before calling here.
3160 fn begin_batch_with_guards(
3161 &self,
3162 wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3163 ) -> BatchGuard {
3164 // /qa F1 reverted (2026-05-10): `in_tick` lives on CoreState
3165 // (per-Core, cross-thread visible). Per-thread placement broke
3166 // cross-Core same-thread BatchGuard isolation — a thread holding
3167 // a live BatchGuard on Core-A and starting a wave on Core-B
3168 // would see ws.in_tick=true (set by Core-A) → Core-B's writes
3169 // get drained by Core-A's binding. Per-Core placement on
3170 // CoreState restores cross-Core isolation. Cost: one state lock
3171 // acquire on outermost batch entry (small; negligible vs the
3172 // wave's drain).
3173 let owns_tick = {
3174 let mut s = self.lock_state();
3175 let was_in = s.in_tick;
3176 if !was_in {
3177 s.in_tick = true;
3178 }
3179 !was_in
3180 };
3181 // D1 patch (2026-05-09): defensive wave-start clear of the
3182 // per-thread Slice G tier3 tracker on outermost owning entry.
3183 // The thread-local is cleared at outermost BatchGuard drop on
3184 // both success + panic paths; this start-clear is belt-and-
3185 // suspenders against panic paths that bypass Drop (catch_unwind
3186 // can interleave with thread reuse — e.g. cargo's test-runner
3187 // thread pool — and propagate stale entries from a prior
3188 // panicked test's wave that didn't fully unwind through
3189 // BatchGuard::drop).
3190 if owns_tick {
3191 tier3_clear();
3192 // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3193 // clear of WaveState's non-retain-holding fields. Mirrors the
3194 // tier3 defensive-clear above. Retain-holding fields
3195 // (wave_cache_snapshots / deferred_handle_releases) MUST be
3196 // empty here — outermost BatchGuard::drop drains them on both
3197 // success + panic paths.
3198 wave_state_clear_outermost();
3199 }
3200 BatchGuard {
3201 core: self.clone(),
3202 owns_tick,
3203 wave_guards,
3204 _not_send: std::marker::PhantomData,
3205 }
3206 }
3207}
3208
3209/// RAII guard returned by [`Core::begin_batch`].
3210///
3211/// While alive, suppresses per-emit wave drains — multiple `emit` /
3212/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3213/// wave. On drop:
3214/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3215/// in-tick).
3216/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3217/// the in-tick flag): silently no-ops.
3218///
3219/// On thread panic during the closure body, the drop path discards pending
3220/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3221/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3222/// State-node cache writes that already executed inside the closure are
3223/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3224/// panic value. The atomicity guarantee covers both sink-observability and
3225/// cache state.
3226///
3227/// # Thread safety
3228///
3229/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3230/// per-`Core` `in_tick` flag AND the per-partition `wave_owner`
3231/// re-entrant mutex(es) on the calling thread; sending the guard to
3232/// another thread and dropping it there would clear `in_tick` and
3233/// release the wave-owner guards from a different thread than the
3234/// one that acquired them, breaking both the thread-local "I own
3235/// the wave scope" semantic and `parking_lot::ReentrantMutex`'s
3236/// ownership invariant. The `wave_guards` field is a `SmallVec` of
3237/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
3238/// marker is belt-and-suspenders.
3239///
3240/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
3241/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
3242/// `SmallVec` of partition wave-owner guards. Closure-form
3243/// `begin_batch` acquires every current partition (serialization
3244/// point); `begin_batch_for(seed)` acquires only the transitively-
3245/// touched partitions (parallel for disjoint sets).
3246///
3247/// ```compile_fail
3248/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3249/// use std::sync::Arc;
3250///
3251/// struct Stub;
3252/// impl BindingBoundary for Stub {
3253/// fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3254/// FnResult::Noop { tracked: None }
3255/// }
3256/// fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3257/// fn release_handle(&self, _: HandleId) {}
3258/// }
3259/// fn requires_send<T: Send>(_: T) {}
3260/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3261/// let guard = core.begin_batch();
3262/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3263/// ```
3264#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3265pub struct BatchGuard {
3266 core: Core,
3267 owns_tick: bool,
3268 /// Re-entrant mutex guards held for the wave's duration. One entry
3269 /// per touched partition's `wave_owner`, in ascending
3270 /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
3271 /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
3272 /// are held by the same thread). Cross-thread waves on any of the
3273 /// held partitions block until our scope ends; cross-thread waves
3274 /// on partitions NOT in this vector run truly parallel — the
3275 /// canonical Y1 parallelism property.
3276 ///
3277 /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
3278 /// (and thus `BatchGuard`) is `!Send` at the type level — sending
3279 /// across threads would violate `parking_lot::ReentrantMutex`'s
3280 /// thread-ownership invariant.
3281 wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3282 _not_send: std::marker::PhantomData<*const ()>,
3283}
3284
3285impl Drop for BatchGuard {
3286 fn drop(&mut self) {
3287 if !self.owns_tick {
3288 return;
3289 }
3290 if std::thread::panicking() {
3291 // Discard pending wave work to avoid firing sinks during
3292 // unwind (sink panic during unwind would abort the process).
3293 //
3294 // Refcount discipline: pending_notify entries (or any
3295 // already-collected deferred_handle_releases from a partial
3296 // drain) hold a queue_notify-time retain on every payload
3297 // handle. Release them here so the discard doesn't leak.
3298 //
3299 // Wave-cache snapshots restore pre-panic cache values so the
3300 // atomicity guarantee covers state, not just observability.
3301 let (pending, deferred_releases, restored_releases) = {
3302 let mut s = self.core.lock_state();
3303 // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09):
3304 // WaveState borrowed alongside state for panic-discard
3305 // cleanup. The WaveState borrow is per-thread,
3306 // independent of state. Sub-slice 2 moved pending_fires
3307 // + pending_notify out of CoreState into WaveState;
3308 // sub-slice 3 moved deferred_flush_jobs + deferred_cleanup_hooks
3309 // + pending_wipes + invalidate_hooks_fired_this_wave;
3310 // /qa F1+F2 (2026-05-10) reverted in_tick + currently_firing
3311 // back to CoreState (cross-Core / cross-thread visibility
3312 // required) — those clear paths are in CoreState::clear_wave_state.
3313 let result = with_wave_state(|ws| {
3314 let pending = std::mem::take(&mut ws.pending_notify);
3315 let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3316 ws.pending_fires.clear();
3317 let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3318 // clear_wave_state pushes batch-handle releases into
3319 // ws.deferred_handle_releases, so take ws's queue AFTER
3320 // the clear.
3321 s.clear_wave_state(ws);
3322 ws.clear_wave_state();
3323 let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
3324 // Slice E2 (D061): panic-discard wave drops queued
3325 // OnInvalidate cleanup hooks SILENTLY. Bindings using
3326 // OnInvalidate for external-resource cleanup MUST
3327 // idempotent-cleanup at process exit / next successful
3328 // invalidate. Mirrors A3 `pending_pause_overflow`
3329 // panic-discard precedent.
3330 let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
3331 std::mem::take(&mut ws.deferred_cleanup_hooks);
3332 // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
3333 // for the eager-wipe queue. A panic-discarded wave drops
3334 // queued `wipe_ctx` fires silently; the binding-side
3335 // `NodeCtxState` entry remains until the next successful
3336 // terminate-with-no-subs cycle (or until `Core` drops).
3337 // This mirrors D061's external-resource-cleanup gap and
3338 // is documented similarly.
3339 let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
3340 (pending, deferred_releases, restored)
3341 });
3342 // /qa F1 reverted (2026-05-10): in_tick lives on CoreState.
3343 // Cleared after the WaveState borrow drops; lock is still
3344 // held, so cross-thread observers see in_tick=false
3345 // atomic with the wave-state cleanup completing.
3346 s.in_tick = false;
3347 result
3348 };
3349 // Lock dropped — release retains lock-released so the binding
3350 // can't deadlock against an internal binding mutex.
3351 for entry in pending.values() {
3352 for msg in entry.iter_messages() {
3353 if let Some(h) = msg.payload_handle() {
3354 self.core.binding.release_handle(h);
3355 }
3356 }
3357 }
3358 for h in deferred_releases {
3359 self.core.binding.release_handle(h);
3360 }
3361 for h in restored_releases {
3362 self.core.binding.release_handle(h);
3363 }
3364 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3365 // tracker on outermost wave-end (panic-discard path). The
3366 // thread-local outlives the BatchGuard otherwise — cargo's
3367 // thread reuse across tests would propagate stale entries.
3368 tier3_clear();
3369 return;
3370 }
3371 // Successful drain — drain_and_flush manages its own locking.
3372 self.core.drain_and_flush();
3373 // Wave cleanup + extract deferred jobs under the lock.
3374 let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
3375 let mut s = self.core.lock_state();
3376 // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
3377 // borrowed alongside state for wave-end cleanup. Per-thread;
3378 // independent of state. Sub-slice 3 moved deferred_* drains
3379 // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
3380 // currently_firing back to CoreState — clear via
3381 // CoreState::clear_wave_state under the held state lock.
3382 let result = with_wave_state(|ws| {
3383 s.clear_wave_state(ws);
3384 ws.clear_wave_state();
3385 // /qa A1 (2026-05-09) discipline preserved: drain snapshot
3386 // retains under lock, release lock-released below to avoid
3387 // binding re-entrance under held mutex / borrow.
3388 let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
3389 // `drain_deferred` takes `deferred_flush_jobs` +
3390 // `deferred_handle_releases` (incl. rotation releases pushed
3391 // by `clear_wave_state` above) + Slice E2
3392 // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
3393 // `pending_wipes` — all from WaveState post-Sub-slice-3.
3394 let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
3395 (jobs, releases, hooks, wipes, snapshot_releases)
3396 });
3397 // /qa F1 reverted (2026-05-10): in_tick lives on CoreState.
3398 // Cleared after the WaveState borrow drops; lock still held.
3399 s.in_tick = false;
3400 result
3401 };
3402 // Lock dropped — fire deferred sinks + release retains + fire
3403 // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
3404 // + fire eager wipes (D069).
3405 self.core
3406 .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
3407 // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
3408 // lock-released. Pre-A1 these were released inside the held
3409 // state + cross_partition locks; binding finalizers re-entering
3410 // Core would deadlock against either mutex. Drained earlier
3411 // under the lock; released here after both mutexes dropped and
3412 // sinks have fired.
3413 for h in snapshot_releases {
3414 self.core.binding.release_handle(h);
3415 }
3416 // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3417 // tracker at outermost wave-end (success path). Mirrors the
3418 // panic-discard branch above. Thread-local outlives BatchGuard
3419 // by default; cargo's thread-reuse across tests would propagate
3420 // stale entries. Cleared after sinks fire (sink callbacks may
3421 // re-enter Core via emit and could read the tier3 set
3422 // mid-wave; the wave is over here so clearing is safe).
3423 tier3_clear();
3424 // QA-fix group 2 (2026-05-09): explicitly drop the wave guards
3425 // in REVERSE acquisition order. `parking_lot::ReentrantMutex`
3426 // doesn't care about release order for same-thread holders, but
3427 // a future migration to a non-reentrant lock (or one with a
3428 // Drop side-effect tied to ordering) would silently break if we
3429 // relied on `SmallVec`'s default forward-iteration drop. The
3430 // ascending-acquire / descending-release pattern is the
3431 // canonical lock-discipline shape.
3432 while let Some(guard) = self.wave_guards.pop() {
3433 drop(guard);
3434 }
3435 }
3436}