use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;
use ahash::AHashSet;
use indexmap::map::Entry;
use indexmap::IndexMap;
use smallvec::SmallVec;
use crate::boundary::{DepBatch, FnEmission, FnResult};
use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
use crate::message::Message;
use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
thread_local! {
static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
}
thread_local! {
static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
}
thread_local! {
static IN_TICK_OWNED: RefCell<AHashSet<u64>> = RefCell::new(AHashSet::new());
}
pub(crate) struct WaveState {
pub(crate) deferred_handle_releases: Vec<HandleId>,
pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
pub(crate) pending_auto_resolve: AHashSet<NodeId>,
pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
pub(crate) pending_fires: AHashSet<NodeId>,
pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
pub(crate) deferred_flush_jobs: DeferredJobs,
pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
pub(crate) pending_wipes: Vec<NodeId>,
}
impl WaveState {
fn new() -> Self {
Self {
deferred_handle_releases: Vec::new(),
wave_cache_snapshots: HashMap::new(),
pending_auto_resolve: AHashSet::new(),
pending_pause_overflow: Vec::new(),
pending_fires: AHashSet::new(),
pending_notify: IndexMap::new(),
invalidate_hooks_fired_this_wave: AHashSet::new(),
deferred_flush_jobs: Vec::new(),
deferred_cleanup_hooks: Vec::new(),
pending_wipes: Vec::new(),
}
}
pub(crate) fn clear_wave_state(&mut self) {
self.pending_auto_resolve.clear();
self.pending_pause_overflow.clear();
self.invalidate_hooks_fired_this_wave.clear();
}
}
pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
}
fn wave_state_clear_outermost() {
with_wave_state(|ws| {
debug_assert!(
ws.wave_cache_snapshots.is_empty(),
"wave_state_clear_outermost: wave_cache_snapshots non-empty at \
outermost wave start ({} entries) — prior BatchGuard::drop \
bypassed the drain (would leak retains into next wave's \
binding). See /qa F4 (2026-05-10).",
ws.wave_cache_snapshots.len()
);
debug_assert!(
ws.deferred_handle_releases.is_empty(),
"wave_state_clear_outermost: deferred_handle_releases non-empty \
at outermost wave start ({} entries) — prior BatchGuard::drop \
bypassed the drain. See /qa F4 (2026-05-10).",
ws.deferred_handle_releases.len()
);
debug_assert!(
ws.pending_notify.is_empty(),
"wave_state_clear_outermost: pending_notify non-empty at \
outermost wave start ({} entries) — prior BatchGuard::drop \
bypassed the drain. See /qa F4 (2026-05-10).",
ws.pending_notify.len()
);
ws.pending_auto_resolve.clear();
ws.pending_pause_overflow.clear();
ws.invalidate_hooks_fired_this_wave.clear();
});
}
struct PartitionCache {
core_generation: u64,
seed: NodeId,
epoch: u64,
partitions: SmallVec<[crate::subgraph::SubgraphId; 4]>,
}
thread_local! {
static PARTITION_CACHE: RefCell<Option<PartitionCache>> = const { RefCell::new(None) };
}
fn tier3_check(node: NodeId) -> bool {
TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
}
fn tier3_mark(node: NodeId) {
TIER3_EMITTED_THIS_WAVE.with(|s| {
s.borrow_mut().insert(node);
});
}
fn tier3_clear() {
TIER3_EMITTED_THIS_WAVE.with(|s| {
s.borrow_mut().clear();
});
}
pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
pub(crate) type WaveDeferred = (
DeferredJobs,
Vec<HandleId>,
Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
Vec<crate::handle::NodeId>,
);
pub(crate) struct PendingBatch {
pub(crate) snapshot_revision: u64,
pub(crate) sinks: SmallVec<[Sink; 1]>,
pub(crate) messages: SmallVec<[Message; 3]>,
}
pub(crate) struct PendingPerNode {
pub(crate) batches: SmallVec<[PendingBatch; 1]>,
}
impl PendingPerNode {
pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
self.batches.iter().flat_map(|b| b.messages.iter())
}
pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
}
}
pub(crate) struct FiringGuard {
core: Core,
node_id: NodeId,
}
impl FiringGuard {
pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
{
let mut s = core.lock_state();
s.currently_firing.push(node_id);
}
Self {
core: core.clone(),
node_id,
}
}
}
impl Drop for FiringGuard {
fn drop(&mut self) {
{
let mut s = self.core.lock_state();
if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
s.currently_firing.swap_remove(pos);
}
}
}
}
fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
s.require_node(node_id)
.op_scratch
.as_ref()
.expect("op_scratch slot uninitialized for operator node")
.as_any_ref()
.downcast_ref::<T>()
.expect("op_scratch type mismatch")
}
fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
s.require_node_mut(node_id)
.op_scratch
.as_mut()
.expect("op_scratch slot uninitialized for operator node")
.as_any_mut()
.downcast_mut::<T>()
.expect("op_scratch type mismatch")
}
impl Core {
pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
where
F: FnOnce(&Self),
{
let _guard = self.begin_batch_for(seed);
op(self);
}
pub(crate) fn try_run_wave_for<F>(
&self,
seed: crate::handle::NodeId,
op: F,
) -> Result<(), crate::node::PartitionOrderViolation>
where
F: FnOnce(&Self),
{
let _guard = self.try_begin_batch_for(seed)?;
op(self);
Ok(())
}
#[must_use]
pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
if ws.wave_cache_snapshots.is_empty() {
return Vec::new();
}
std::mem::take(&mut ws.wave_cache_snapshots)
.into_values()
.collect()
}
pub(crate) fn restore_wave_cache_snapshots(
&self,
s: &mut CoreState,
ws: &mut WaveState,
) -> Vec<HandleId> {
if ws.wave_cache_snapshots.is_empty() {
return Vec::new();
}
let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
let mut releases = Vec::with_capacity(snapshots.len());
for (node_id, old_handle) in snapshots {
let Some(rec) = s.nodes.get_mut(&node_id) else {
releases.push(old_handle);
continue;
};
let current = std::mem::replace(&mut rec.cache, old_handle);
if current != NO_HANDLE {
releases.push(current);
}
}
releases
}
pub(crate) fn drain_and_flush(&self) {
let mut guard = 0u32;
loop {
let synth_pending = with_wave_state(|ws| {
if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
std::mem::take(&mut ws.pending_pause_overflow)
} else {
Vec::new()
}
});
for entry in synth_pending {
let handle = self.binding.synthesize_pause_overflow_error(
entry.node_id,
entry.dropped_count,
entry.configured_max,
entry.lock_held_ns / 1_000_000,
);
if let Some(h) = handle {
self.error(entry.node_id, h);
}
}
let (next, cap, pending_size) = {
let s = self.lock_state();
let cap = s.max_batch_drain_iterations;
let (next, pending_size) = with_wave_state(|ws| {
if ws.pending_fires.is_empty() {
return (None, 0);
}
let size = ws.pending_fires.len();
let next = Self::pick_next_fire(&s, ws);
(next, size)
});
if pending_size == 0 {
break;
}
(next, cap, pending_size)
};
guard += 1;
assert!(
guard < cap,
"wave drain exceeded {cap} iterations \
(pending_fires={pending_size}). Most likely cause: a runtime \
cycle introduced by an operator that re-arms its own pending_fires \
slot from inside `invoke_fn` (e.g. a producer that subscribes to \
itself, or a fn that calls Core::emit on a node whose fn fires \
the original node again). Structural cycles via set_deps are \
rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
only with concrete evidence the workload needs more iterations."
);
let Some(next) = next else { break };
self.fire_fn(next);
}
let mut s = self.lock_state();
let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
for node_id in candidates {
let needs_resolve = with_wave_state(|ws| {
ws.pending_notify
.get(&node_id)
.is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
});
if needs_resolve {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
}
self.flush_notifications(&mut s);
}
fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
ws.pending_fires
.iter()
.copied()
.min_by_key(|&id| s.nodes.get(&id).map_or(0, |r| r.topo_rank))
}
pub(crate) fn fire_fn(&self, node_id: NodeId) {
let op = {
let s = self.lock_state();
s.nodes.get(&node_id).and_then(|r| r.op)
};
match op {
Some(operator_op) => self.fire_operator(node_id, operator_op),
None => {
self.fire_regular(node_id);
}
}
}
#[allow(clippy::too_many_lines)] fn fire_regular(&self, node_id: NodeId) {
enum FireAction {
None,
SingleData(HandleId),
Batch(SmallVec<[FnEmission; 2]>),
}
let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
let s = self.lock_state();
with_wave_state(|ws| {
ws.pending_fires.remove(&node_id);
});
let rec = s.require_node(node_id);
if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
None
} else {
rec.fn_id.map(|fn_id| {
let use_mask = rec.dep_records.len() <= 64;
let mask = rec.involved_mask;
let dep_batches: Vec<DepBatch> = rec
.dep_records
.iter()
.enumerate()
.map(|(i, dr)| DepBatch {
data: dr.data_batch.clone(),
prev_data: dr.prev_data,
involved: if use_mask {
(mask >> i) & 1 != 0
} else {
dr.involved_this_wave
},
})
.collect();
(fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
})
}
};
let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
return;
};
if has_fired_once {
self.binding
.cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
}
let result = {
let _firing = FiringGuard::new(self, node_id);
self.binding.invoke_fn(node_id, fn_id, &dep_batches)
};
let action: FireAction = {
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
match &result {
FnResult::Data { handle, .. } => {
self.binding.release_handle(*handle);
}
FnResult::Batch { emissions, .. } => {
for em in emissions {
match em {
FnEmission::Data(h) | FnEmission::Error(h) => {
self.binding.release_handle(*h);
}
FnEmission::Complete => {}
}
}
}
FnResult::Noop { .. } => {}
}
return;
}
let rec = s.require_node_mut(node_id);
rec.has_fired_once = true;
if is_dynamic {
let tracked = match &result {
FnResult::Data { tracked, .. }
| FnResult::Noop { tracked }
| FnResult::Batch { tracked, .. } => tracked.clone(),
};
if let Some(t) = tracked {
rec.tracked = t.into_iter().collect();
}
}
match result {
FnResult::Noop { .. } => {
let already_dirty = s.require_node(node_id).dirty;
let already_tier3 = with_wave_state(|ws| {
ws.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
});
if already_dirty && !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
FireAction::None
}
FnResult::Data { handle, .. } => FireAction::SingleData(handle),
FnResult::Batch { emissions, .. } if emissions.is_empty() => {
let already_dirty = s.require_node(node_id).dirty;
let already_tier3 = with_wave_state(|ws| {
ws.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
});
if already_dirty && !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
FireAction::None
}
FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
}
};
match action {
FireAction::None => {}
FireAction::SingleData(handle) => {
self.commit_emission(node_id, handle);
}
FireAction::Batch(emissions) => {
self.commit_batch(node_id, emissions);
}
}
}
fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
let mut iter = emissions.into_iter();
for em in iter.by_ref() {
match em {
FnEmission::Data(handle) => {
self.commit_emission_verbatim(node_id, handle);
}
FnEmission::Complete => {
self.complete(node_id);
break;
}
FnEmission::Error(handle) => {
self.error(node_id, handle);
break;
}
}
}
for em in iter {
match em {
FnEmission::Data(h) | FnEmission::Error(h) => {
self.binding.release_handle(h);
}
FnEmission::Complete => {}
}
}
}
#[allow(clippy::too_many_lines)]
pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
);
let snapshot = {
let s = self.lock_state();
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
(rec.cache, rec.equals)
};
let (old_handle, equals_mode) = snapshot;
let is_subsequent_emit_in_wave = tier3_check(node_id);
if is_subsequent_emit_in_wave {
self.rewrite_prior_resolved_to_data(node_id);
self.commit_emission_verbatim(node_id, new_handle);
return;
}
let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
if is_data {
let current_cache = s.require_node(node_id).cache;
let in_tick = self.in_tick();
let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
Entry::Vacant(slot) => {
slot.insert(current_cache);
true
}
Entry::Occupied(_) => false,
})
} else {
false
};
s.require_node_mut(node_id).cache = new_handle;
if current_cache != NO_HANDLE && !snapshot_taken {
self.binding.release_handle(current_cache);
}
self.push_replay_buffer(&mut s, node_id, new_handle);
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Data(new_handle));
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
}
}
} else {
let current_cache = s.require_node(node_id).cache;
if self.in_tick() && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
with_wave_state(|ws| {
if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
self.binding.retain_handle(current_cache);
slot.insert(current_cache);
}
});
}
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Resolved);
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
for child_id in child_ids {
let already_involved = s.require_node(child_id).involved_this_wave;
if !already_involved {
{
let child = s.require_node_mut(child_id);
child.involved_this_wave = true;
child.dirty = true;
}
self.queue_notify(&mut s, child_id, Message::Dirty);
auto_resolve_inserts.push(child_id);
}
}
if !auto_resolve_inserts.is_empty() {
with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
}
}
}
fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
let mut s = self.lock_state();
let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
Some(h) if h != NO_HANDLE => h,
_ => return,
};
let mut retains_needed = 0u32;
with_wave_state(|ws| {
if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
for msg in entry.iter_messages_mut() {
if matches!(msg, Message::Resolved) {
*msg = Message::Data(snapshot);
retains_needed += 1;
}
}
}
});
if let Some(rec) = s.nodes.get_mut(&node_id) {
if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
for msg in &mut *buffer {
if matches!(msg, Message::Resolved) {
*msg = Message::Data(snapshot);
retains_needed += 1;
}
}
}
}
drop(s);
for _ in 0..retains_needed {
self.binding.retain_handle(snapshot);
}
}
fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
if a == b {
return true; }
if a == NO_HANDLE || b == NO_HANDLE {
return false;
}
match mode {
EqualsMode::Identity => false,
EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
}
}
fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
);
let mut s = self.lock_state();
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
let current_cache = s.require_node(node_id).cache;
let snapshot_taken = if self.in_tick() && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
Entry::Vacant(slot) => {
slot.insert(current_cache);
true
}
Entry::Occupied(_) => false,
})
} else {
false
};
s.require_node_mut(node_id).cache = new_handle;
if current_cache != NO_HANDLE && !snapshot_taken {
self.binding.release_handle(current_cache);
}
self.push_replay_buffer(&mut s, node_id, new_handle);
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Data(new_handle));
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
}
}
}
fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
let rec = s.require_node_mut(node_id);
let cap = match rec.replay_buffer_cap {
Some(c) if c > 0 => c,
_ => return,
};
self.binding.retain_handle(new_handle);
rec.replay_buffer.push_back(new_handle);
let evicted = if rec.replay_buffer.len() > cap {
rec.replay_buffer.pop_front()
} else {
None
};
if let Some(h) = evicted {
with_wave_state(|ws| ws.deferred_handle_releases.push(h));
}
}
fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
let proceed = {
let s = self.lock_state();
with_wave_state(|ws| {
ws.pending_fires.remove(&node_id);
});
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
false
} else {
let has_real_input = !rec.has_sentinel_deps()
|| rec.dep_records.iter().any(|dr| dr.terminal.is_some());
rec.partial || has_real_input
}
};
if !proceed {
return;
}
let _firing = FiringGuard::new(self, node_id);
match op {
OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
OperatorOp::DistinctUntilChanged { equals_fn_id } => {
self.fire_op_distinct(node_id, equals_fn_id);
}
OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
OperatorOp::WithLatestFrom { pack_fn } => {
self.fire_op_with_latest_from(node_id, pack_fn);
}
OperatorOp::Merge => self.fire_op_merge(node_id),
OperatorOp::Take { count } => self.fire_op_take(node_id, count),
OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
OperatorOp::Last { .. } => self.fire_op_last(node_id),
OperatorOp::Tap { fn_id } => self.fire_op_tap(node_id, fn_id),
OperatorOp::TapFirst { fn_id } => self.fire_op_tap_first(node_id, fn_id),
OperatorOp::Valve => self.fire_op_valve(node_id),
OperatorOp::Settle {
quiet_waves,
max_waves,
} => self.fire_op_settle(node_id, quiet_waves, max_waves),
}
}
fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
let s = self.lock_state();
let rec = s.require_node(node_id);
debug_assert!(
!rec.dep_records.is_empty(),
"transform operator must have ≥1 dep"
);
let dr = &rec.dep_records[0];
(dr.data_batch.iter().copied().collect(), dr.terminal)
}
fn settle_dirty_resolved(&self, node_id: NodeId) {
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
let already_tier3 = with_wave_state(|ws| {
ws.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
});
if !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
}
fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let outputs = self.binding.project_each(fn_id, &inputs);
for h in outputs {
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let pass = self.binding.predicate_each(fn_id, &inputs);
assert!(
pass.len() == inputs.len(),
"predicate_each returned {} bools for {} inputs",
pass.len(),
inputs.len()
);
let mut emitted = 0usize;
for (i, &h) in inputs.iter().enumerate() {
if pass.get(i).copied().unwrap_or(false) {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
}
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::ScanState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let acc = {
let s = self.lock_state();
scratch_ref::<ScanState>(&s, node_id).acc
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let new_states = self.binding.fold_each(fn_id, acc, &inputs);
assert!(
new_states.len() == inputs.len(),
"fold_each returned {} accs for {} inputs",
new_states.len(),
inputs.len()
);
let last_acc = new_states.last().copied();
if let Some(last) = last_acc {
let prev_acc = {
let mut s = self.lock_state();
let scratch = scratch_mut::<ScanState>(&mut s, node_id);
let prev = scratch.acc;
scratch.acc = last;
prev
};
self.binding.retain_handle(last);
if prev_acc != crate::handle::NO_HANDLE {
self.binding.release_handle(prev_acc);
}
}
for h in new_states {
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::ReduceState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
let acc = {
let s = self.lock_state();
scratch_ref::<ReduceState>(&s, node_id).acc
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
let new_states = if inputs.is_empty() {
SmallVec::<[HandleId; 1]>::new()
} else {
self.binding.fold_each(fn_id, acc, &inputs)
};
assert!(
new_states.len() == inputs.len(),
"fold_each returned {} accs for {} inputs",
new_states.len(),
inputs.len()
);
let last_acc = new_states.last().copied();
let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
new_states[..new_states.len() - 1].to_vec()
} else {
Vec::new()
};
let prev_acc_to_release = if let Some(last) = last_acc {
let prev_acc = {
let mut s = self.lock_state();
let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
let prev = scratch.acc;
scratch.acc = last;
prev
};
self.binding.retain_handle(last);
if prev_acc == crate::handle::NO_HANDLE {
None
} else {
Some(prev_acc)
}
} else {
None
};
for h in intermediates_to_release {
self.binding.release_handle(h);
}
if let Some(h) = prev_acc_to_release {
self.binding.release_handle(h);
}
match terminal {
None => {
}
Some(TerminalKind::Complete) => {
let final_acc = {
let s = self.lock_state();
scratch_ref::<ReduceState>(&s, node_id).acc
};
if final_acc != crate::handle::NO_HANDLE {
self.binding.retain_handle(final_acc);
self.commit_emission_verbatim(node_id, final_acc);
}
self.complete(node_id);
}
Some(TerminalKind::Error(h)) => {
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
}
fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
use crate::op_state::DistinctState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut prev = {
let s = self.lock_state();
scratch_ref::<DistinctState>(&s, node_id).prev
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
if prev != crate::handle::NO_HANDLE {
self.binding.retain_handle(prev);
}
let mut emitted = 0usize;
for &h in &inputs {
let equal = if prev == crate::handle::NO_HANDLE {
false
} else if prev == h {
true
} else {
self.binding.custom_equals(equals_fn_id, prev, h)
};
if !equal {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
self.binding.retain_handle(h);
let old_prev = prev;
prev = h;
if old_prev != crate::handle::NO_HANDLE {
self.binding.release_handle(old_prev);
}
emitted += 1;
}
}
{
let mut s = self.lock_state();
let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
let stale_slot = scratch.prev;
scratch.prev = prev;
if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
drop(s);
self.binding.release_handle(stale_slot);
}
}
if emitted == 0 && prev != crate::handle::NO_HANDLE {
self.binding.release_handle(prev);
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::PairwiseState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut prev = {
let s = self.lock_state();
scratch_ref::<PairwiseState>(&s, node_id).prev
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let mut emitted = 0usize;
for &h in &inputs {
if prev == crate::handle::NO_HANDLE {
self.binding.retain_handle(h);
prev = h;
continue;
}
let packed = self.binding.pairwise_pack(fn_id, prev, h);
self.commit_emission_verbatim(node_id, packed);
self.binding.retain_handle(h);
let old_prev = prev;
prev = h;
self.binding.release_handle(old_prev);
emitted += 1;
}
{
let mut s = self.lock_state();
let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
let stale_slot = scratch.prev;
scratch.prev = prev;
if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
drop(s);
self.binding.release_handle(stale_slot);
}
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
let s = self.lock_state();
let rec = s.require_node(node_id);
let primary_fired = rec
.dep_records
.first()
.is_some_and(|dr| !dr.data_batch.is_empty());
let latest: SmallVec<[HandleId; 4]> = rec
.dep_records
.iter()
.map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
.collect();
(latest, primary_fired)
}
fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if latest.contains(&crate::handle::NO_HANDLE) {
self.settle_dirty_resolved(node_id);
return;
}
let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
self.commit_emission_verbatim(node_id, tuple_handle);
}
fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
let first_fire = {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
let was_first = !rec.has_fired_once;
rec.has_fired_once = true;
was_first
};
if !first_fire && !primary_fired {
self.settle_dirty_resolved(node_id);
return;
}
debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
if latest[1] == crate::handle::NO_HANDLE {
self.settle_dirty_resolved(node_id);
return;
}
let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
self.commit_emission_verbatim(node_id, tuple_handle);
}
fn fire_op_merge(&self, node_id: NodeId) {
let all_handles: Vec<HandleId> = {
let s = self.lock_state();
let rec = s.require_node(node_id);
rec.dep_records
.iter()
.flat_map(|dr| dr.data_batch.iter().copied())
.collect()
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if all_handles.is_empty() {
self.settle_dirty_resolved(node_id);
return;
}
for &h in &all_handles {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_take(&self, node_id: NodeId, count: u32) {
use crate::op_state::TakeState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
let mut count_emitted = {
let s = self.lock_state();
scratch_ref::<TakeState>(&s, node_id).count_emitted
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if count_emitted >= count {
self.complete(node_id);
return;
}
for &h in &inputs {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
count_emitted = count_emitted.saturating_add(1);
if count_emitted >= count {
break;
}
}
{
let mut s = self.lock_state();
scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
}
if count_emitted >= count {
self.complete(node_id);
return;
}
if inputs.is_empty() && terminal.is_none() {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_skip(&self, node_id: NodeId, count: u32) {
use crate::op_state::SkipState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut count_skipped = {
let s = self.lock_state();
scratch_ref::<SkipState>(&s, node_id).count_skipped
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
let mut emitted = 0usize;
for &h in &inputs {
if count_skipped < count {
count_skipped = count_skipped.saturating_add(1);
continue;
}
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
}
{
let mut s = self.lock_state();
scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let pass = self.binding.predicate_each(fn_id, &inputs);
assert!(
pass.len() == inputs.len(),
"predicate_each returned {} bools for {} inputs",
pass.len(),
inputs.len()
);
let mut emitted = 0usize;
let mut first_false_seen = false;
for (i, &h) in inputs.iter().enumerate() {
if pass.get(i).copied().unwrap_or(false) {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
} else {
first_false_seen = true;
break;
}
}
if first_false_seen {
self.complete(node_id);
return;
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_last(&self, node_id: NodeId) {
use crate::op_state::LastState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if let Some(&new_latest) = inputs.last() {
let prev_latest = {
let mut s = self.lock_state();
let scratch = scratch_mut::<LastState>(&mut s, node_id);
let prev = scratch.latest;
scratch.latest = new_latest;
prev
};
self.binding.retain_handle(new_latest);
if prev_latest != crate::handle::NO_HANDLE {
self.binding.release_handle(prev_latest);
}
}
match terminal {
None => {}
Some(TerminalKind::Complete) => {
let (latest, default) = {
let s = self.lock_state();
let scratch = scratch_ref::<LastState>(&s, node_id);
(scratch.latest, scratch.default)
};
let to_emit = if latest != crate::handle::NO_HANDLE {
Some(latest)
} else if default != crate::handle::NO_HANDLE {
Some(default)
} else {
None
};
if let Some(h) = to_emit {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
self.complete(node_id);
}
Some(TerminalKind::Error(h)) => {
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
}
fn fire_op_tap(&self, node_id: NodeId, fn_id: FnId) {
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
if terminal.is_none() {
self.settle_dirty_resolved(node_id);
}
} else {
for &h in &inputs {
self.binding.invoke_tap_fn(fn_id, h);
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
}
match terminal {
None => {}
Some(TerminalKind::Complete) => {
self.binding.invoke_tap_complete_fn(fn_id);
self.complete(node_id);
}
Some(TerminalKind::Error(h)) => {
self.binding.invoke_tap_error_fn(fn_id, h);
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
}
fn fire_op_tap_first(&self, node_id: NodeId, fn_id: FnId) {
use crate::op_state::TapFirstState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
if terminal.is_none() {
self.settle_dirty_resolved(node_id);
}
} else {
let fired = {
let s = self.lock_state();
scratch_ref::<TapFirstState>(&s, node_id).fired
};
for &h in &inputs {
if !fired {
self.binding.invoke_tap_fn(fn_id, h);
let mut s = self.lock_state();
scratch_mut::<TapFirstState>(&mut s, node_id).fired = true;
}
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
}
if let Some(TerminalKind::Complete) = terminal {
self.complete(node_id);
} else if let Some(TerminalKind::Error(h)) = terminal {
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
fn fire_op_valve(&self, node_id: NodeId) {
let (src_inputs, src_terminal, ctrl_latest) = {
let s = self.lock_state();
let rec = s.require_node(node_id);
debug_assert!(rec.dep_records.len() == 2, "valve must have exactly 2 deps");
let dr0 = &rec.dep_records[0];
let dr1 = &rec.dep_records[1];
let src_inputs: Vec<HandleId> = dr0.data_batch.iter().copied().collect();
let src_term = dr0.terminal;
let ctrl = dr1.data_batch.last().copied().unwrap_or(dr1.prev_data);
(src_inputs, src_term, ctrl)
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if let Some(TerminalKind::Complete) = src_terminal {
self.complete(node_id);
return;
}
if let Some(TerminalKind::Error(h)) = src_terminal {
self.binding.retain_handle(h);
self.error(node_id, h);
return;
}
let gate_open = ctrl_latest != crate::handle::NO_HANDLE;
if !gate_open {
self.settle_dirty_resolved(node_id);
return;
}
if src_inputs.is_empty() {
let prev_src = {
let s = self.lock_state();
s.require_node(node_id).dep_records[0].prev_data
};
if prev_src == crate::handle::NO_HANDLE {
self.settle_dirty_resolved(node_id);
} else {
self.binding.retain_handle(prev_src);
self.commit_emission_verbatim(node_id, prev_src);
}
} else {
for &h in &src_inputs {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
}
}
fn fire_op_settle(&self, node_id: NodeId, quiet_waves: u32, max_waves: Option<u32>) {
use crate::op_state::SettleState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if let Some(TerminalKind::Complete) = terminal {
self.complete(node_id);
return;
}
if let Some(TerminalKind::Error(h)) = terminal {
self.binding.retain_handle(h);
self.error(node_id, h);
return;
}
let saw_data = !inputs.is_empty();
for &h in &inputs {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
let should_complete = {
let mut s = self.lock_state();
let scratch = scratch_mut::<SettleState>(&mut s, node_id);
scratch.wave_count += 1;
if saw_data {
scratch.has_value = true;
scratch.quiet_count = 0;
} else {
scratch.quiet_count += 1;
}
let settled = scratch.has_value && scratch.quiet_count >= quiet_waves;
let exhausted = max_waves.is_some_and(|max| scratch.wave_count >= max);
settled || exhausted
};
if should_complete {
self.complete(node_id);
} else if !saw_data {
self.settle_dirty_resolved(node_id);
}
}
pub(crate) fn deliver_data_to_consumer(
&self,
s: &mut CoreState,
consumer_id: NodeId,
dep_idx: usize,
handle: HandleId,
) {
self.binding.retain_handle(handle);
let is_dynamic;
let is_state;
let tracked_or_first_fire;
let suppressed_for_default_pause;
{
let consumer = s.require_node_mut(consumer_id);
consumer.dep_records[dep_idx].data_batch.push(handle);
consumer.dep_records[dep_idx].involved_this_wave = true;
consumer.involved_this_wave = true;
if dep_idx < 64 {
consumer.received_mask |= 1u64 << dep_idx;
consumer.involved_mask |= 1u64 << dep_idx;
}
is_dynamic = consumer.is_dynamic;
is_state = consumer.is_state();
tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
suppressed_for_default_pause = consumer.pause_state.is_paused()
&& consumer.pausable == crate::node::PausableMode::Default;
if suppressed_for_default_pause {
consumer.pause_state.mark_pending_wave();
}
}
if suppressed_for_default_pause {
return;
}
if is_state {
} else if is_dynamic {
if tracked_or_first_fire {
with_wave_state(|ws| {
ws.pending_fires.insert(consumer_id);
});
}
} else {
with_wave_state(|ws| {
ws.pending_fires.insert(consumer_id);
});
}
}
pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
#[cfg(debug_assertions)]
if matches!(msg.tier(), 3) {
with_wave_state(|ws| {
if let Some(entry) = ws.pending_notify.get(&node_id) {
let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
let resolved_count = entry
.iter_messages()
.filter(|m| matches!(m, Message::Resolved))
.count();
let incoming_is_data = matches!(msg, Message::Data(_));
if incoming_is_data {
debug_assert!(
resolved_count == 0,
"R1.3.3.a violation at {node_id:?}: queueing Data into a \
wave that already contains Resolved — Slice G should have \
prevented this via wave-end coalescing"
);
} else {
debug_assert!(
!has_data,
"R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
wave that already contains Data"
);
debug_assert!(
resolved_count == 0,
"R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
wave at one node"
);
}
}
});
}
let buffered_tier = matches!(msg.tier(), 3 | 4);
let cap = s.pause_buffer_cap;
{
let rec = s.require_node_mut(node_id);
if rec.subscribers.is_empty() {
return;
}
let mode_buffers_tier3 = match rec.pausable {
crate::node::PausableMode::ResumeAll => true,
crate::node::PausableMode::Default => rec.is_state(),
crate::node::PausableMode::Off => false,
};
if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
if let Some(h) = msg.payload_handle() {
self.binding.retain_handle(h);
}
let push_result = rec.pause_state.push_buffered(msg, cap);
for dm in push_result.dropped_msgs {
if let Some(h) = dm.payload_handle() {
self.binding.release_handle(h);
}
}
if push_result.first_overflow_this_cycle {
if let Some((dropped_count, lock_held_ns)) =
rec.pause_state.overflow_diagnostic()
{
with_wave_state(|ws| {
ws.pending_pause_overflow
.push(crate::node::PendingPauseOverflow {
node_id,
dropped_count,
configured_max: cap.unwrap_or(0),
lock_held_ns,
});
});
}
}
return;
}
}
if let Some(h) = msg.payload_handle() {
self.binding.retain_handle(h);
}
Self::push_into_pending_notify(s, node_id, msg);
}
fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
let current_rev = s.require_node(node_id).subscribers_revision;
let needs_new_batch = with_wave_state(|ws| {
ws.pending_notify.get(&node_id).is_none_or(|entry| {
entry
.batches
.last()
.is_none_or(|b| b.snapshot_revision != current_rev)
})
});
let sinks_snapshot: SmallVec<[Sink; 1]> = if needs_new_batch {
s.require_node(node_id)
.subscribers
.values()
.cloned()
.collect()
} else {
SmallVec::new()
};
with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
Entry::Vacant(slot) => {
let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
batches.push(PendingBatch {
snapshot_revision: current_rev,
sinks: sinks_snapshot,
messages: smallvec::smallvec![msg],
});
slot.insert(PendingPerNode { batches });
}
Entry::Occupied(mut slot) => {
let entry = slot.get_mut();
if needs_new_batch {
entry.batches.push(PendingBatch {
snapshot_revision: current_rev,
sinks: sinks_snapshot,
messages: smallvec::smallvec![msg],
});
} else {
entry
.batches
.last_mut()
.expect("non-empty by construction (entry exists implies batch exists)")
.messages
.push(msg);
}
}
});
}
fn flush_notifications(&self, s: &mut CoreState) {
const PHASES: &[&[u8]] = &[
&[1], &[3, 4], &[5], &[6], ];
let _ = &*s; let pending = with_wave_state(|ws| std::mem::take(&mut ws.pending_notify));
let mut jobs: DeferredJobs = Vec::new();
for &phase_tiers in PHASES {
for (_node_id, entry) in &pending {
for batch in &entry.batches {
if batch.sinks.is_empty() {
continue;
}
let phase_msgs: Vec<Message> = batch
.messages
.iter()
.copied()
.filter(|m| phase_tiers.contains(&m.tier()))
.collect();
if phase_msgs.is_empty() {
continue;
}
let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
jobs.push((sinks_clone, phase_msgs));
}
}
}
with_wave_state(|ws| {
ws.deferred_flush_jobs.append(&mut jobs);
for entry in pending.values() {
for msg in entry.iter_messages() {
if let Some(h) = msg.payload_handle() {
ws.deferred_handle_releases.push(h);
}
}
}
});
}
pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
(
std::mem::take(&mut ws.deferred_flush_jobs),
std::mem::take(&mut ws.deferred_handle_releases),
std::mem::take(&mut ws.deferred_cleanup_hooks),
std::mem::take(&mut ws.pending_wipes),
)
}
pub(crate) fn fire_deferred(
&self,
jobs: DeferredJobs,
releases: Vec<HandleId>,
cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
pending_wipes: Vec<crate::handle::NodeId>,
) {
let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
for (sinks, msgs) in jobs {
for sink in &sinks {
let sink = sink.clone();
let msgs_ref = &msgs;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
sink(msgs_ref);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
}
for h in releases {
self.binding.release_handle(h);
}
for (node_id, trigger) in cleanup_hooks {
let binding = &self.binding;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
binding.cleanup_for(node_id, trigger);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
for node_id in pending_wipes {
let binding = &self.binding;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
binding.wipe_ctx(node_id);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
if let Some(payload) = last_panic {
std::panic::resume_unwind(payload);
}
}
pub fn batch<F>(&self, f: F)
where
F: FnOnce(),
{
let _guard = self.begin_batch();
f();
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub fn begin_batch(&self) -> BatchGuard {
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let epoch_before = self.registry.lock().epoch();
let partition_boxes = self.all_partitions_lock_boxes();
let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
for (sid, _box) in &partition_boxes {
let representative = crate::handle::NodeId::new(sid.raw());
wave_guards.push(
self.partition_wave_owner_lock_arc(representative)
.unwrap_or_else(|e| panic!("{e}")),
);
}
let epoch_after = self.registry.lock().epoch();
if epoch_after == epoch_before {
return self.begin_batch_with_guards(wave_guards);
}
drop(wave_guards);
std::thread::yield_now();
}
panic!(
"Core::begin_batch: exceeded {} retries — pathological concurrent \
register/union/split activity racing with closure-form batch entry",
crate::subgraph::MAX_LOCK_RETRIES
);
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
match self.try_begin_batch_for(seed) {
Ok(guard) => guard,
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_begin_batch_for(
&self,
seed: crate::handle::NodeId,
) -> Result<BatchGuard, crate::node::PartitionOrderViolation> {
let core_generation = self.generation;
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let epoch_before = self.registry.lock().epoch();
let touched = PARTITION_CACHE
.with(|cell| {
let cache = cell.borrow();
if let Some(ref c) = *cache {
if c.core_generation == core_generation
&& c.seed == seed
&& c.epoch == epoch_before
{
return Some(c.partitions.clone());
}
}
None
})
.unwrap_or_else(|| {
let result = self.compute_touched_partitions(seed);
PARTITION_CACHE.with(|cell| {
*cell.borrow_mut() = Some(PartitionCache {
core_generation,
seed,
epoch: epoch_before,
partitions: result.clone(),
});
});
result
});
let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
let mut partition_err = None;
for sid in &touched {
let representative = crate::handle::NodeId::new(sid.raw());
match self.partition_wave_owner_lock_arc(representative) {
Ok(guard) => wave_guards.push(guard),
Err(e) => {
partition_err = Some(e);
break;
}
}
}
if let Some(e) = partition_err {
drop(wave_guards);
return Err(e);
}
let epoch_after = self.registry.lock().epoch();
if epoch_after == epoch_before {
return Ok(self.begin_batch_with_guards(wave_guards));
}
PARTITION_CACHE.with(|cell| {
*cell.borrow_mut() = None;
});
drop(wave_guards);
std::thread::yield_now();
}
panic!(
"Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
pathological concurrent register/union/split activity racing \
with per-seed batch entry",
crate::subgraph::MAX_LOCK_RETRIES
);
}
#[must_use]
fn in_tick(&self) -> bool {
IN_TICK_OWNED.with(|s| s.borrow().contains(&self.generation))
}
fn claim_in_tick(&self) -> bool {
IN_TICK_OWNED.with(|s| s.borrow_mut().insert(self.generation))
}
fn clear_in_tick(&self) {
IN_TICK_OWNED.with(|s| {
s.borrow_mut().remove(&self.generation);
});
}
fn begin_batch_with_guards(
&self,
wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
) -> BatchGuard {
let owns_tick = self.claim_in_tick();
if owns_tick {
tier3_clear();
wave_state_clear_outermost();
}
BatchGuard {
core: self.clone(),
owns_tick,
wave_guards,
_not_send: std::marker::PhantomData,
}
}
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub struct BatchGuard {
core: Core,
owns_tick: bool,
wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
_not_send: std::marker::PhantomData<*const ()>,
}
impl BatchGuard {
fn discard_wave_cleanup(&self) {
let (pending, deferred_releases, restored_releases) = {
let mut s = self.core.lock_state();
with_wave_state(|ws| {
let pending = std::mem::take(&mut ws.pending_notify);
let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
ws.pending_fires.clear();
let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
s.clear_wave_state(ws);
ws.clear_wave_state();
let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
std::mem::take(&mut ws.deferred_cleanup_hooks);
let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
(pending, deferred_releases, restored)
})
};
for entry in pending.values() {
for msg in entry.iter_messages() {
if let Some(h) = msg.payload_handle() {
self.core.binding.release_handle(h);
}
}
}
for h in deferred_releases {
self.core.binding.release_handle(h);
}
for h in restored_releases {
self.core.binding.release_handle(h);
}
tier3_clear();
{
let mut ops = self.core.deferred_producer_ops.lock();
let discarded = std::mem::take(&mut *ops);
for op in discarded {
match op {
crate::node::DeferredProducerOp::Emit { handle, .. }
| crate::node::DeferredProducerOp::Error { handle, .. } => {
self.core.binding.release_handle(handle);
}
_ => {} }
}
}
}
}
impl Drop for BatchGuard {
fn drop(&mut self) {
if !self.owns_tick {
return;
}
if std::thread::panicking() {
self.discard_wave_cleanup();
self.core.clear_in_tick();
return;
}
if let Err(payload) =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.core.drain_and_flush()))
{
self.discard_wave_cleanup();
self.core.clear_in_tick();
std::panic::resume_unwind(payload);
}
let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
let mut s = self.core.lock_state();
let result = with_wave_state(|ws| {
s.clear_wave_state(ws);
ws.clear_wave_state();
let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
(jobs, releases, hooks, wipes, snapshot_releases)
});
self.core.clear_in_tick();
result
};
self.core
.fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
for h in snapshot_releases {
self.core.binding.release_handle(h);
}
tier3_clear();
while let Some(guard) = self.wave_guards.pop() {
drop(guard);
}
self.core.drain_deferred_producer_ops();
}
}