use std::cell::RefCell;
use std::sync::Arc;
use ahash::AHashSet;
use indexmap::map::Entry;
use smallvec::SmallVec;
use crate::boundary::{DepBatch, FnEmission, FnResult};
use crate::handle::{HandleId, NodeId, NO_HANDLE};
use crate::message::Message;
use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
thread_local! {
static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
}
fn tier3_check(node: NodeId) -> bool {
TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
}
fn tier3_mark(node: NodeId) {
TIER3_EMITTED_THIS_WAVE.with(|s| {
s.borrow_mut().insert(node);
});
}
fn tier3_clear() {
TIER3_EMITTED_THIS_WAVE.with(|s| {
s.borrow_mut().clear();
});
}
pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
pub(crate) type WaveDeferred = (
DeferredJobs,
Vec<HandleId>,
Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
Vec<crate::handle::NodeId>,
);
pub(crate) struct PendingBatch {
pub(crate) snapshot_revision: u64,
pub(crate) sinks: Vec<Sink>,
pub(crate) messages: Vec<Message>,
}
pub(crate) struct PendingPerNode {
pub(crate) batches: SmallVec<[PendingBatch; 1]>,
}
impl PendingPerNode {
pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
self.batches.iter().flat_map(|b| b.messages.iter())
}
pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
}
}
pub(crate) struct FiringGuard {
core: Core,
node_id: NodeId,
is_producer_build: bool,
}
impl FiringGuard {
pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
let is_producer = {
let mut s = core.lock_state();
s.currently_firing.push(node_id);
s.nodes
.get(&node_id)
.is_some_and(crate::node::NodeRecord::is_producer)
};
let guard = Self {
core: core.clone(),
node_id,
is_producer_build: is_producer,
};
if is_producer {
crate::node::producer_build_enter();
}
guard
}
}
impl Drop for FiringGuard {
fn drop(&mut self) {
#[cfg(debug_assertions)]
{
let s = self.core.lock_state();
let now_producer = s
.nodes
.get(&self.node_id)
.is_some_and(crate::node::NodeRecord::is_producer);
if s.nodes.contains_key(&self.node_id) {
debug_assert_eq!(
self.is_producer_build,
now_producer,
"FiringGuard invariant violation: node {:?} was {} at \
construction but is {} at Drop. The is_producer flag \
must be stable for a node's lifetime; see FiringGuard \
struct docstring.",
self.node_id,
if self.is_producer_build {
"is_producer=true"
} else {
"is_producer=false"
},
if now_producer {
"is_producer=true"
} else {
"is_producer=false"
},
);
}
drop(s);
}
let mut s = self.core.lock_state();
if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
s.currently_firing.swap_remove(pos);
}
drop(s);
if self.is_producer_build {
crate::node::producer_build_exit();
}
}
}
fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
s.require_node(node_id)
.op_scratch
.as_ref()
.expect("op_scratch slot uninitialized for operator node")
.as_any_ref()
.downcast_ref::<T>()
.expect("op_scratch type mismatch")
}
fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
s.require_node_mut(node_id)
.op_scratch
.as_mut()
.expect("op_scratch slot uninitialized for operator node")
.as_any_mut()
.downcast_mut::<T>()
.expect("op_scratch type mismatch")
}
impl Core {
pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
where
F: FnOnce(&Self),
{
let _guard = self.begin_batch_for(seed);
op(self);
}
#[must_use]
pub(crate) fn drain_wave_cache_snapshots(
cps: &mut crate::node::CrossPartitionState,
) -> Vec<HandleId> {
if cps.wave_cache_snapshots.is_empty() {
return Vec::new();
}
std::mem::take(&mut cps.wave_cache_snapshots)
.into_iter()
.map(|(_, h)| h)
.collect()
}
pub(crate) fn restore_wave_cache_snapshots(
&self,
s: &mut CoreState,
cps: &mut crate::node::CrossPartitionState,
) -> Vec<HandleId> {
if cps.wave_cache_snapshots.is_empty() {
return Vec::new();
}
let snapshots = std::mem::take(&mut cps.wave_cache_snapshots);
let mut releases = Vec::with_capacity(snapshots.len());
for (node_id, old_handle) in snapshots {
let Some(rec) = s.nodes.get_mut(&node_id) else {
releases.push(old_handle);
continue;
};
let current = std::mem::replace(&mut rec.cache, old_handle);
if current != NO_HANDLE {
releases.push(current);
}
}
releases
}
pub(crate) fn drain_and_flush(&self) {
let mut guard = 0u32;
loop {
let synth_pending = {
let s = self.lock_state();
if s.pending_fires.is_empty() {
let mut cps = self.lock_cross_partition();
if cps.pending_pause_overflow.is_empty() {
Vec::new()
} else {
std::mem::take(&mut cps.pending_pause_overflow)
}
} else {
Vec::new()
}
};
for entry in synth_pending {
let handle = self.binding.synthesize_pause_overflow_error(
entry.node_id,
entry.dropped_count,
entry.configured_max,
entry.lock_held_ns / 1_000_000,
);
if let Some(h) = handle {
self.error(entry.node_id, h);
}
}
let (next, cap, pending_size) = {
let s = self.lock_state();
if s.pending_fires.is_empty() {
break;
}
let cap = s.max_batch_drain_iterations;
let pending_size = s.pending_fires.len();
let next = self.pick_next_fire(&s);
(next, cap, pending_size)
};
guard += 1;
assert!(
guard < cap,
"wave drain exceeded {cap} iterations \
(pending_fires={pending_size}). Most likely cause: a runtime \
cycle introduced by an operator that re-arms its own pending_fires \
slot from inside `invoke_fn` (e.g. a producer that subscribes to \
itself, or a fn that calls Core::emit on a node whose fn fires \
the original node again). Structural cycles via set_deps are \
rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
only with concrete evidence the workload needs more iterations."
);
let Some(next) = next else { break };
self.fire_fn(next);
}
let mut s = self.lock_state();
let candidates = {
let mut cps = self.lock_cross_partition();
std::mem::take(&mut cps.pending_auto_resolve)
};
for node_id in candidates {
let needs_resolve = s
.pending_notify
.get(&node_id)
.is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3));
if needs_resolve {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
}
self.flush_notifications(&mut s);
}
fn pick_next_fire(&self, s: &CoreState) -> Option<NodeId> {
for &id in &s.pending_fires {
if Self::transitive_upstream_settled(s, id) {
return Some(id);
}
}
s.pending_fires.iter().copied().next()
}
fn transitive_upstream_settled(s: &CoreState, node_id: NodeId) -> bool {
let rec = s.require_node(node_id);
if rec.dep_count() == 0 {
return true;
}
let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
let mut stack: Vec<NodeId> = rec.dep_ids_vec();
while let Some(id) = stack.pop() {
if !visited.insert(id) {
continue;
}
if s.pending_fires.contains(&id) {
return false;
}
if let Some(r) = s.nodes.get(&id) {
for dep_id in r.dep_ids() {
if !visited.contains(&dep_id) {
stack.push(dep_id);
}
}
}
}
true
}
pub(crate) fn fire_fn(&self, node_id: NodeId) {
let op = {
let s = self.lock_state();
s.nodes.get(&node_id).and_then(|r| r.op)
};
match op {
Some(operator_op) => self.fire_operator(node_id, operator_op),
None => {
self.fire_regular(node_id);
}
}
}
#[allow(clippy::too_many_lines)] fn fire_regular(&self, node_id: NodeId) {
enum FireAction {
None,
SingleData(HandleId),
Batch(SmallVec<[FnEmission; 2]>),
}
let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
let mut s = self.lock_state();
s.pending_fires.remove(&node_id);
let rec = s.require_node(node_id);
if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
None
} else {
rec.fn_id.map(|fn_id| {
let dep_batches: Vec<DepBatch> = rec
.dep_records
.iter()
.map(|dr| DepBatch {
data: dr.data_batch.clone(),
prev_data: dr.prev_data,
involved: dr.involved_this_wave,
})
.collect();
(fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
})
}
};
let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
return;
};
if has_fired_once {
self.binding
.cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
}
let result = {
let _firing = FiringGuard::new(self, node_id);
self.binding.invoke_fn(node_id, fn_id, &dep_batches)
};
let action: FireAction = {
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
match &result {
FnResult::Data { handle, .. } => {
self.binding.release_handle(*handle);
}
FnResult::Batch { emissions, .. } => {
for em in emissions {
match em {
FnEmission::Data(h) | FnEmission::Error(h) => {
self.binding.release_handle(*h);
}
FnEmission::Complete => {}
}
}
}
FnResult::Noop { .. } => {}
}
return;
}
let rec = s.require_node_mut(node_id);
rec.has_fired_once = true;
if is_dynamic {
let tracked = match &result {
FnResult::Data { tracked, .. }
| FnResult::Noop { tracked }
| FnResult::Batch { tracked, .. } => tracked.clone(),
};
if let Some(t) = tracked {
rec.tracked = t.into_iter().collect();
}
}
match result {
FnResult::Noop { .. } => {
let already_dirty = s.require_node(node_id).dirty;
let already_tier3 = s
.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
if already_dirty && !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
FireAction::None
}
FnResult::Data { handle, .. } => FireAction::SingleData(handle),
FnResult::Batch { emissions, .. } if emissions.is_empty() => {
let already_dirty = s.require_node(node_id).dirty;
let already_tier3 = s
.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
if already_dirty && !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
FireAction::None
}
FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
}
};
match action {
FireAction::None => {}
FireAction::SingleData(handle) => {
self.commit_emission(node_id, handle);
}
FireAction::Batch(emissions) => {
self.commit_batch(node_id, emissions);
}
}
}
fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
let mut iter = emissions.into_iter();
for em in iter.by_ref() {
match em {
FnEmission::Data(handle) => {
self.commit_emission_verbatim(node_id, handle);
}
FnEmission::Complete => {
self.complete(node_id);
break;
}
FnEmission::Error(handle) => {
self.error(node_id, handle);
break;
}
}
}
for em in iter {
match em {
FnEmission::Data(h) | FnEmission::Error(h) => {
self.binding.release_handle(h);
}
FnEmission::Complete => {}
}
}
}
#[allow(clippy::too_many_lines)]
pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
);
let snapshot = {
let s = self.lock_state();
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
(rec.cache, rec.equals)
};
let (old_handle, equals_mode) = snapshot;
let is_subsequent_emit_in_wave = tier3_check(node_id);
if is_subsequent_emit_in_wave {
self.rewrite_prior_resolved_to_data(node_id);
self.commit_emission_verbatim(node_id, new_handle);
return;
}
let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
if is_data {
let current_cache = s.require_node(node_id).cache;
let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
let mut cps = self.lock_cross_partition();
match cps.wave_cache_snapshots.entry(node_id) {
Entry::Vacant(slot) => {
slot.insert(current_cache);
true
}
Entry::Occupied(_) => false,
}
} else {
false
};
s.require_node_mut(node_id).cache = new_handle;
if current_cache != NO_HANDLE && !snapshot_taken {
self.binding.release_handle(current_cache);
}
self.push_replay_buffer(&mut s, node_id, new_handle);
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Data(new_handle));
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
}
}
} else {
let current_cache = s.require_node(node_id).cache;
if s.in_tick && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
let mut cps = self.lock_cross_partition();
if let Entry::Vacant(slot) = cps.wave_cache_snapshots.entry(node_id) {
self.binding.retain_handle(current_cache);
slot.insert(current_cache);
}
}
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Resolved);
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
for child_id in child_ids {
let already_involved = s.require_node(child_id).involved_this_wave;
if !already_involved {
{
let child = s.require_node_mut(child_id);
child.involved_this_wave = true;
child.dirty = true;
}
self.queue_notify(&mut s, child_id, Message::Dirty);
auto_resolve_inserts.push(child_id);
}
}
if !auto_resolve_inserts.is_empty() {
let mut cps = self.lock_cross_partition();
cps.pending_auto_resolve.extend(auto_resolve_inserts);
}
}
}
fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
let mut s = self.lock_state();
let snapshot = match self
.lock_cross_partition()
.wave_cache_snapshots
.get(&node_id)
.copied()
{
Some(h) if h != NO_HANDLE => h,
_ => return,
};
let mut retains_needed = 0u32;
if let Some(entry) = s.pending_notify.get_mut(&node_id) {
for msg in entry.iter_messages_mut() {
if matches!(msg, Message::Resolved) {
*msg = Message::Data(snapshot);
retains_needed += 1;
}
}
}
if let Some(rec) = s.nodes.get_mut(&node_id) {
if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
for msg in &mut *buffer {
if matches!(msg, Message::Resolved) {
*msg = Message::Data(snapshot);
retains_needed += 1;
}
}
}
}
drop(s);
for _ in 0..retains_needed {
self.binding.retain_handle(snapshot);
}
}
fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
if a == b {
return true; }
if a == NO_HANDLE || b == NO_HANDLE {
return false;
}
match mode {
EqualsMode::Identity => false,
EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
}
}
fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
);
let mut s = self.lock_state();
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
let current_cache = s.require_node(node_id).cache;
let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
use std::collections::hash_map::Entry;
let mut cps = self.lock_cross_partition();
match cps.wave_cache_snapshots.entry(node_id) {
Entry::Vacant(slot) => {
slot.insert(current_cache);
true
}
Entry::Occupied(_) => false,
}
} else {
false
};
s.require_node_mut(node_id).cache = new_handle;
if current_cache != NO_HANDLE && !snapshot_taken {
self.binding.release_handle(current_cache);
}
self.push_replay_buffer(&mut s, node_id, new_handle);
tier3_mark(node_id);
self.queue_notify(&mut s, node_id, Message::Data(new_handle));
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
}
}
}
fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
let rec = s.require_node_mut(node_id);
let cap = match rec.replay_buffer_cap {
Some(c) if c > 0 => c,
_ => return,
};
self.binding.retain_handle(new_handle);
rec.replay_buffer.push_back(new_handle);
let evicted = if rec.replay_buffer.len() > cap {
rec.replay_buffer.pop_front()
} else {
None
};
if let Some(h) = evicted {
self.lock_cross_partition().deferred_handle_releases.push(h);
}
}
fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
let proceed = {
let mut s = self.lock_state();
s.pending_fires.remove(&node_id);
let rec = s.require_node(node_id);
if rec.terminal.is_some() {
false
} else {
let has_real_input = !rec.has_sentinel_deps()
|| rec.dep_records.iter().any(|dr| dr.terminal.is_some());
rec.partial || has_real_input
}
};
if !proceed {
return;
}
let _firing = FiringGuard::new(self, node_id);
match op {
OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
OperatorOp::DistinctUntilChanged { equals_fn_id } => {
self.fire_op_distinct(node_id, equals_fn_id);
}
OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
OperatorOp::WithLatestFrom { pack_fn } => {
self.fire_op_with_latest_from(node_id, pack_fn);
}
OperatorOp::Merge => self.fire_op_merge(node_id),
OperatorOp::Take { count } => self.fire_op_take(node_id, count),
OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
OperatorOp::Last { .. } => self.fire_op_last(node_id),
}
}
fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
let s = self.lock_state();
let rec = s.require_node(node_id);
debug_assert!(
!rec.dep_records.is_empty(),
"transform operator must have ≥1 dep"
);
let dr = &rec.dep_records[0];
(dr.data_batch.iter().copied().collect(), dr.terminal)
}
fn settle_dirty_resolved(&self, node_id: NodeId) {
let mut s = self.lock_state();
if s.require_node(node_id).terminal.is_some() {
return;
}
let already_dirty = s.require_node(node_id).dirty;
s.require_node_mut(node_id).dirty = true;
if !already_dirty {
self.queue_notify(&mut s, node_id, Message::Dirty);
}
let already_tier3 = s
.pending_notify
.get(&node_id)
.is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
if !already_tier3 {
self.queue_notify(&mut s, node_id, Message::Resolved);
}
}
fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let outputs = self.binding.project_each(fn_id, &inputs);
for h in outputs {
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let pass = self.binding.predicate_each(fn_id, &inputs);
debug_assert!(
pass.len() == inputs.len(),
"predicate_each returned {} bools for {} inputs",
pass.len(),
inputs.len()
);
let mut emitted = 0usize;
for (i, &h) in inputs.iter().enumerate() {
if pass.get(i).copied().unwrap_or(false) {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
}
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::ScanState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let acc = {
let s = self.lock_state();
scratch_ref::<ScanState>(&s, node_id).acc
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let new_states = self.binding.fold_each(fn_id, acc, &inputs);
debug_assert!(
new_states.len() == inputs.len(),
"fold_each returned {} accs for {} inputs",
new_states.len(),
inputs.len()
);
let last_acc = new_states.last().copied();
if let Some(last) = last_acc {
let prev_acc = {
let mut s = self.lock_state();
let scratch = scratch_mut::<ScanState>(&mut s, node_id);
let prev = scratch.acc;
scratch.acc = last;
prev
};
self.binding.retain_handle(last);
if prev_acc != crate::handle::NO_HANDLE {
self.binding.release_handle(prev_acc);
}
}
for h in new_states {
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::ReduceState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
let acc = {
let s = self.lock_state();
scratch_ref::<ReduceState>(&s, node_id).acc
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
let new_states = if inputs.is_empty() {
SmallVec::<[HandleId; 1]>::new()
} else {
self.binding.fold_each(fn_id, acc, &inputs)
};
debug_assert!(
new_states.len() == inputs.len(),
"fold_each returned {} accs for {} inputs",
new_states.len(),
inputs.len()
);
let last_acc = new_states.last().copied();
let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
new_states[..new_states.len() - 1].to_vec()
} else {
Vec::new()
};
let prev_acc_to_release = if let Some(last) = last_acc {
let prev_acc = {
let mut s = self.lock_state();
let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
let prev = scratch.acc;
scratch.acc = last;
prev
};
self.binding.retain_handle(last);
if prev_acc == crate::handle::NO_HANDLE {
None
} else {
Some(prev_acc)
}
} else {
None
};
for h in intermediates_to_release {
self.binding.release_handle(h);
}
if let Some(h) = prev_acc_to_release {
self.binding.release_handle(h);
}
match terminal {
None => {
}
Some(TerminalKind::Complete) => {
let final_acc = {
let s = self.lock_state();
scratch_ref::<ReduceState>(&s, node_id).acc
};
if final_acc != crate::handle::NO_HANDLE {
self.binding.retain_handle(final_acc);
self.commit_emission_verbatim(node_id, final_acc);
}
self.complete(node_id);
}
Some(TerminalKind::Error(h)) => {
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
}
fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
use crate::op_state::DistinctState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut prev = {
let s = self.lock_state();
scratch_ref::<DistinctState>(&s, node_id).prev
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
if prev != crate::handle::NO_HANDLE {
self.binding.retain_handle(prev);
}
let mut emitted = 0usize;
for &h in &inputs {
let equal = if prev == crate::handle::NO_HANDLE {
false
} else if prev == h {
true
} else {
self.binding.custom_equals(equals_fn_id, prev, h)
};
if !equal {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
self.binding.retain_handle(h);
let old_prev = prev;
prev = h;
if old_prev != crate::handle::NO_HANDLE {
self.binding.release_handle(old_prev);
}
emitted += 1;
}
}
{
let mut s = self.lock_state();
let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
let stale_slot = scratch.prev;
scratch.prev = prev;
if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
drop(s);
self.binding.release_handle(stale_slot);
}
}
if emitted == 0 && prev != crate::handle::NO_HANDLE {
self.binding.release_handle(prev);
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
use crate::op_state::PairwiseState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut prev = {
let s = self.lock_state();
scratch_ref::<PairwiseState>(&s, node_id).prev
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let mut emitted = 0usize;
for &h in &inputs {
if prev == crate::handle::NO_HANDLE {
self.binding.retain_handle(h);
prev = h;
continue;
}
let packed = self.binding.pairwise_pack(fn_id, prev, h);
self.commit_emission_verbatim(node_id, packed);
self.binding.retain_handle(h);
let old_prev = prev;
prev = h;
self.binding.release_handle(old_prev);
emitted += 1;
}
{
let mut s = self.lock_state();
let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
let stale_slot = scratch.prev;
scratch.prev = prev;
if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
drop(s);
self.binding.release_handle(stale_slot);
}
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
let s = self.lock_state();
let rec = s.require_node(node_id);
let primary_fired = rec
.dep_records
.first()
.is_some_and(|dr| !dr.data_batch.is_empty());
let latest: SmallVec<[HandleId; 4]> = rec
.dep_records
.iter()
.map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
.collect();
(latest, primary_fired)
}
fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if latest.contains(&crate::handle::NO_HANDLE) {
self.settle_dirty_resolved(node_id);
return;
}
let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
self.commit_emission_verbatim(node_id, tuple_handle);
}
fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
let first_fire = {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
let was_first = !rec.has_fired_once;
rec.has_fired_once = true;
was_first
};
if !first_fire && !primary_fired {
self.settle_dirty_resolved(node_id);
return;
}
debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
if latest[1] == crate::handle::NO_HANDLE {
self.settle_dirty_resolved(node_id);
return;
}
let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
self.commit_emission_verbatim(node_id, tuple_handle);
}
fn fire_op_merge(&self, node_id: NodeId) {
let all_handles: Vec<HandleId> = {
let s = self.lock_state();
let rec = s.require_node(node_id);
rec.dep_records
.iter()
.flat_map(|dr| dr.data_batch.iter().copied())
.collect()
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if all_handles.is_empty() {
self.settle_dirty_resolved(node_id);
return;
}
for &h in &all_handles {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
}
fn fire_op_take(&self, node_id: NodeId, count: u32) {
use crate::op_state::TakeState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
let mut count_emitted = {
let s = self.lock_state();
scratch_ref::<TakeState>(&s, node_id).count_emitted
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if count_emitted >= count {
self.complete(node_id);
return;
}
for &h in &inputs {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
count_emitted = count_emitted.saturating_add(1);
if count_emitted >= count {
break;
}
}
{
let mut s = self.lock_state();
scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
}
if count_emitted >= count {
self.complete(node_id);
return;
}
if inputs.is_empty() && terminal.is_none() {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_skip(&self, node_id: NodeId, count: u32) {
use crate::op_state::SkipState;
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
let mut count_skipped = {
let s = self.lock_state();
scratch_ref::<SkipState>(&s, node_id).count_skipped
};
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
let mut emitted = 0usize;
for &h in &inputs {
if count_skipped < count {
count_skipped = count_skipped.saturating_add(1);
continue;
}
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
}
{
let mut s = self.lock_state();
scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if inputs.is_empty() {
return;
}
let pass = self.binding.predicate_each(fn_id, &inputs);
debug_assert!(
pass.len() == inputs.len(),
"predicate_each returned {} bools for {} inputs",
pass.len(),
inputs.len()
);
let mut emitted = 0usize;
let mut first_false_seen = false;
for (i, &h) in inputs.iter().enumerate() {
if pass.get(i).copied().unwrap_or(false) {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
emitted += 1;
} else {
first_false_seen = true;
break;
}
}
if first_false_seen {
self.complete(node_id);
return;
}
if emitted == 0 {
self.settle_dirty_resolved(node_id);
}
}
fn fire_op_last(&self, node_id: NodeId) {
use crate::op_state::LastState;
let (inputs, terminal) = self.snapshot_op_dep0(node_id);
{
let mut s = self.lock_state();
s.require_node_mut(node_id).has_fired_once = true;
}
if let Some(&new_latest) = inputs.last() {
let prev_latest = {
let mut s = self.lock_state();
let scratch = scratch_mut::<LastState>(&mut s, node_id);
let prev = scratch.latest;
scratch.latest = new_latest;
prev
};
self.binding.retain_handle(new_latest);
if prev_latest != crate::handle::NO_HANDLE {
self.binding.release_handle(prev_latest);
}
}
match terminal {
None => {}
Some(TerminalKind::Complete) => {
let (latest, default) = {
let s = self.lock_state();
let scratch = scratch_ref::<LastState>(&s, node_id);
(scratch.latest, scratch.default)
};
let to_emit = if latest != crate::handle::NO_HANDLE {
Some(latest)
} else if default != crate::handle::NO_HANDLE {
Some(default)
} else {
None
};
if let Some(h) = to_emit {
self.binding.retain_handle(h);
self.commit_emission_verbatim(node_id, h);
}
self.complete(node_id);
}
Some(TerminalKind::Error(h)) => {
self.binding.retain_handle(h);
self.error(node_id, h);
}
}
}
pub(crate) fn deliver_data_to_consumer(
&self,
s: &mut CoreState,
consumer_id: NodeId,
dep_idx: usize,
handle: HandleId,
) {
self.binding.retain_handle(handle);
let is_dynamic;
let is_state;
let tracked_or_first_fire;
let suppressed_for_default_pause;
{
let consumer = s.require_node_mut(consumer_id);
consumer.dep_records[dep_idx].data_batch.push(handle);
consumer.dep_records[dep_idx].involved_this_wave = true;
consumer.involved_this_wave = true;
is_dynamic = consumer.is_dynamic;
is_state = consumer.is_state();
tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
suppressed_for_default_pause = consumer.pause_state.is_paused()
&& consumer.pausable == crate::node::PausableMode::Default;
if suppressed_for_default_pause {
consumer.pause_state.mark_pending_wave();
}
}
if suppressed_for_default_pause {
return;
}
if is_state {
} else if is_dynamic {
if tracked_or_first_fire {
s.pending_fires.insert(consumer_id);
}
} else {
s.pending_fires.insert(consumer_id);
}
}
pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
#[cfg(debug_assertions)]
if matches!(msg.tier(), 3) {
if let Some(entry) = s.pending_notify.get(&node_id) {
let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
let resolved_count = entry
.iter_messages()
.filter(|m| matches!(m, Message::Resolved))
.count();
let incoming_is_data = matches!(msg, Message::Data(_));
if incoming_is_data {
debug_assert!(
resolved_count == 0,
"R1.3.3.a violation at {node_id:?}: queueing Data into a \
wave that already contains Resolved — Slice G should have \
prevented this via wave-end coalescing"
);
} else {
debug_assert!(
!has_data,
"R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
wave that already contains Data"
);
debug_assert!(
resolved_count == 0,
"R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
wave at one node"
);
}
}
}
let buffered_tier = matches!(msg.tier(), 3 | 4);
let cap = s.pause_buffer_cap;
{
let rec = s.require_node_mut(node_id);
if rec.subscribers.is_empty() {
return;
}
let mode_buffers_tier3 = match rec.pausable {
crate::node::PausableMode::ResumeAll => true,
crate::node::PausableMode::Default => rec.is_state(),
crate::node::PausableMode::Off => false,
};
if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
if let Some(h) = msg.payload_handle() {
self.binding.retain_handle(h);
}
let push_result = rec.pause_state.push_buffered(msg, cap);
for dm in push_result.dropped_msgs {
if let Some(h) = dm.payload_handle() {
self.binding.release_handle(h);
}
}
if push_result.first_overflow_this_cycle {
if let Some((dropped_count, lock_held_ns)) =
rec.pause_state.overflow_diagnostic()
{
self.lock_cross_partition().pending_pause_overflow.push(
crate::node::PendingPauseOverflow {
node_id,
dropped_count,
configured_max: cap.unwrap_or(0),
lock_held_ns,
},
);
}
}
return;
}
}
if let Some(h) = msg.payload_handle() {
self.binding.retain_handle(h);
}
Self::push_into_pending_notify(s, node_id, msg);
}
fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
let current_rev = s.require_node(node_id).subscribers_revision;
let needs_new_batch = s.pending_notify.get(&node_id).is_none_or(|entry| {
entry
.batches
.last()
.is_none_or(|b| b.snapshot_revision != current_rev)
});
let sinks_snapshot: Vec<Sink> = if needs_new_batch {
s.require_node(node_id)
.subscribers
.values()
.cloned()
.collect()
} else {
Vec::new()
};
match s.pending_notify.entry(node_id) {
Entry::Vacant(slot) => {
let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
batches.push(PendingBatch {
snapshot_revision: current_rev,
sinks: sinks_snapshot,
messages: vec![msg],
});
slot.insert(PendingPerNode { batches });
}
Entry::Occupied(mut slot) => {
let entry = slot.get_mut();
if needs_new_batch {
entry.batches.push(PendingBatch {
snapshot_revision: current_rev,
sinks: sinks_snapshot,
messages: vec![msg],
});
} else {
entry
.batches
.last_mut()
.expect("non-empty by construction (entry exists implies batch exists)")
.messages
.push(msg);
}
}
}
}
fn flush_notifications(&self, s: &mut CoreState) {
const PHASES: &[&[u8]] = &[
&[1], &[3, 4], &[5], &[6], ];
let pending = std::mem::take(&mut s.pending_notify);
for &phase_tiers in PHASES {
for (_node_id, entry) in &pending {
for batch in &entry.batches {
if batch.sinks.is_empty() {
continue;
}
let phase_msgs: Vec<Message> = batch
.messages
.iter()
.copied()
.filter(|m| phase_tiers.contains(&m.tier()))
.collect();
if phase_msgs.is_empty() {
continue;
}
let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
s.deferred_flush_jobs.push((sinks_clone, phase_msgs));
}
}
}
let mut cps = self.lock_cross_partition();
for entry in pending.values() {
for msg in entry.iter_messages() {
if let Some(h) = msg.payload_handle() {
cps.deferred_handle_releases.push(h);
}
}
}
}
pub(crate) fn drain_deferred(
s: &mut CoreState,
cps: &mut crate::node::CrossPartitionState,
) -> WaveDeferred {
(
std::mem::take(&mut s.deferred_flush_jobs),
std::mem::take(&mut cps.deferred_handle_releases),
std::mem::take(&mut s.deferred_cleanup_hooks),
std::mem::take(&mut s.pending_wipes),
)
}
pub(crate) fn fire_deferred(
&self,
jobs: DeferredJobs,
releases: Vec<HandleId>,
cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
pending_wipes: Vec<crate::handle::NodeId>,
) {
let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
for (sinks, msgs) in jobs {
for sink in &sinks {
let sink = sink.clone();
let msgs_ref = &msgs;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
sink(msgs_ref);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
}
for h in releases {
self.binding.release_handle(h);
}
for (node_id, trigger) in cleanup_hooks {
let binding = &self.binding;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
binding.cleanup_for(node_id, trigger);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
for node_id in pending_wipes {
let binding = &self.binding;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
binding.wipe_ctx(node_id);
}));
if let Err(payload) = result {
last_panic = Some(payload);
}
}
if let Some(payload) = last_panic {
std::panic::resume_unwind(payload);
}
}
pub fn batch<F>(&self, f: F)
where
F: FnOnce(),
{
let _guard = self.begin_batch();
f();
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub fn begin_batch(&self) -> BatchGuard {
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let epoch_before = self.registry.lock().epoch();
let partition_boxes = self.all_partitions_lock_boxes();
let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
for (sid, _box) in &partition_boxes {
let representative = crate::handle::NodeId::new(sid.raw());
wave_guards.push(self.partition_wave_owner_lock_arc(representative));
}
let epoch_after = self.registry.lock().epoch();
if epoch_after == epoch_before {
return self.begin_batch_with_guards(wave_guards);
}
drop(wave_guards);
std::thread::yield_now();
}
panic!(
"Core::begin_batch: exceeded {} retries — pathological concurrent \
register/union/split activity racing with closure-form batch entry",
crate::subgraph::MAX_LOCK_RETRIES
);
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let epoch_before = self.registry.lock().epoch();
let touched = self.compute_touched_partitions(seed);
let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
for sid in &touched {
let representative = crate::handle::NodeId::new(sid.raw());
wave_guards.push(self.partition_wave_owner_lock_arc(representative));
}
let epoch_after = self.registry.lock().epoch();
if epoch_after == epoch_before {
return self.begin_batch_with_guards(wave_guards);
}
drop(wave_guards);
std::thread::yield_now();
}
panic!(
"Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
pathological concurrent register/union/split activity racing \
with per-seed batch entry",
crate::subgraph::MAX_LOCK_RETRIES
);
}
fn begin_batch_with_guards(
&self,
wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
) -> BatchGuard {
let owns_tick = {
let mut s = self.lock_state();
let was_in = s.in_tick;
if !was_in {
s.in_tick = true;
}
!was_in
};
if owns_tick {
tier3_clear();
}
BatchGuard {
core: self.clone(),
owns_tick,
wave_guards,
_not_send: std::marker::PhantomData,
}
}
}
#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
pub struct BatchGuard {
core: Core,
owns_tick: bool,
wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
_not_send: std::marker::PhantomData<*const ()>,
}
impl Drop for BatchGuard {
fn drop(&mut self) {
if !self.owns_tick {
return;
}
if std::thread::panicking() {
let (pending, deferred_releases, restored_releases) = {
let mut s = self.core.lock_state();
let mut cps = self.core.lock_cross_partition();
let pending = std::mem::take(&mut s.pending_notify);
let _: DeferredJobs = std::mem::take(&mut s.deferred_flush_jobs);
s.pending_fires.clear();
let restored = self.core.restore_wave_cache_snapshots(&mut s, &mut cps);
s.clear_wave_state(&mut cps);
cps.clear_wave_state();
let deferred_releases = std::mem::take(&mut cps.deferred_handle_releases);
let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
std::mem::take(&mut s.deferred_cleanup_hooks);
let _: Vec<crate::handle::NodeId> = std::mem::take(&mut s.pending_wipes);
s.in_tick = false;
(pending, deferred_releases, restored)
};
for entry in pending.values() {
for msg in entry.iter_messages() {
if let Some(h) = msg.payload_handle() {
self.core.binding.release_handle(h);
}
}
}
for h in deferred_releases {
self.core.binding.release_handle(h);
}
for h in restored_releases {
self.core.binding.release_handle(h);
}
tier3_clear();
return;
}
self.core.drain_and_flush();
let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
let mut s = self.core.lock_state();
let mut cps = self.core.lock_cross_partition();
s.clear_wave_state(&mut cps);
cps.clear_wave_state();
s.in_tick = false;
let snapshot_releases = Core::drain_wave_cache_snapshots(&mut cps);
let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, &mut cps);
(jobs, releases, hooks, wipes, snapshot_releases)
};
self.core
.fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
for h in snapshot_releases {
self.core.binding.release_handle(h);
}
tier3_clear();
while let Some(guard) = self.wave_guards.pop() {
drop(guard);
}
}
}