use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::{Arc, Weak};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use indexmap::IndexMap;
use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
pub(crate) struct WaveOwnerGuard {
sid: crate::subgraph::SubgraphId,
#[allow(dead_code)]
inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
}
impl Drop for WaveOwnerGuard {
fn drop(&mut self) {
let _ = held_partitions::release(self.sid);
}
}
mod held_partitions {
use crate::subgraph::SubgraphId;
use smallvec::SmallVec;
use std::cell::RefCell;
const HELD_INLINE: usize = 4;
thread_local! {
static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
= const { RefCell::new(SmallVec::new_const()) };
static IN_PRODUCER_BUILD: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
}
pub fn producer_build_enter() {
IN_PRODUCER_BUILD.with(|c| {
let next = c.get().checked_add(1).expect(
"in_producer_build refcount overflow — unbounded \
producer-build re-entrance. Should be bounded by the \
protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
);
c.set(next);
});
}
pub fn producer_build_exit() {
IN_PRODUCER_BUILD.with(|c| c.set(c.get().saturating_sub(1)));
}
fn in_producer_build() -> bool {
IN_PRODUCER_BUILD.with(|c| c.get() > 0)
}
pub(crate) fn check_and_acquire(sid: SubgraphId) {
HELD.with(|h| {
let mut held = h.borrow_mut();
let already_held = held.iter().any(|(s, _)| *s == sid);
if !held.is_empty() && !in_producer_build() && !already_held {
if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
if sid <= max_held {
let new_id = sid;
drop(held);
panic!(
"Phase H+ ascending-order violation: thread tried \
to acquire partition {new_id:?} while already \
holding partition {max_held:?}. \
The same-thread cross-partition lock-acquisition \
protocol requires every NEW partition acquired \
while ANY partition is already held to have id \
strictly greater than every already-held \
partition; otherwise two threads doing \
reciprocal acquires can form an AB/BA deadlock.\n\
\n\
Note: this check is per-thread. A cross-thread \
AB/BA cycle between threads each obeying \
ascending order at the per-thread level is \
prevented at a different layer — the \
`compute_touched_partitions` upfront-acquire-\
all-ascending rule in `Core::begin_batch_for`.\n\
\n\
Common triggers (see docs/porting-deferred.md \
'Cross-partition acquire-during-fire deadlock'):\n\
- A user fn (derived / dynamic) that calls \
`Core::emit` / `complete` / `error` / `teardown` \
/ `invalidate` mid-fire on a node in a partition \
with a smaller id than the firing node's.\n\
- A sink callback that calls `Core::subscribe` \
on a node in a smaller-id partition while the \
outer wave's wave_owner is still held.\n\
- A subscribe handshake (or Drop cleanup) that \
re-enters Core on a smaller-id partition.\n\
\n\
Fix: schedule the cross-partition operation \
OUTSIDE the wave (e.g., via a deferred queue \
applied at wave end) so the acquire happens at \
top-level rather than nested under a held \
partition; OR declare the cross-partition \
reachability upfront via `add_meta_companion` \
so the wave acquires both partitions ascending \
at top-level and the inner re-entry becomes a \
re-entrant acquire on a held partition."
);
}
}
}
if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
*count = count.checked_add(1).expect(
"held_partitions refcount overflow — unbounded \
same-partition re-entrance. Should be bounded by the \
protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
);
} else {
held.push((sid, 1));
}
});
}
pub(crate) fn release(sid: SubgraphId) -> bool {
HELD.with(|h| {
let mut held = h.borrow_mut();
if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
let count = &mut held[idx].1;
debug_assert!(
*count > 0,
"held_partitions::release({sid:?}): refcount underflow — \
release without matching check_and_acquire (caller bug)"
);
*count = count.saturating_sub(1);
if *count == 0 {
held.swap_remove(idx);
return true;
}
} else {
debug_assert!(
false,
"held_partitions::release({sid:?}): sid not in HELD — \
double-drop or stray release (caller bug)"
);
}
false
})
}
#[cfg(any(test, debug_assertions))]
#[must_use]
pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
v.sort_unstable_by_key(|(s, _)| *s);
v
}
#[cfg(any(test, debug_assertions))]
#[must_use]
pub fn in_producer_build_for_tests() -> u32 {
IN_PRODUCER_BUILD.with(std::cell::Cell::get)
}
}
pub use held_partitions::{producer_build_enter, producer_build_exit};
#[cfg(any(test, debug_assertions))]
pub use held_partitions::{held_snapshot_for_tests, in_producer_build_for_tests};
use smallvec::SmallVec;
use thiserror::Error;
use crate::batch::PendingPerNode;
use crate::boundary::{BindingBoundary, CleanupTrigger};
use crate::clock::monotonic_ns;
use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
use crate::message::Message;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TerminalKind {
Complete,
Error(HandleId),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum NodeKind {
State,
Producer,
Derived,
Dynamic,
Operator(OperatorOp),
}
impl NodeKind {
pub(crate) fn skips_auto_cascade(self) -> bool {
matches!(
self,
NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
)
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum OperatorOp {
Map { fn_id: FnId },
Filter { fn_id: FnId },
Scan { fn_id: FnId, seed: HandleId },
Reduce { fn_id: FnId, seed: HandleId },
DistinctUntilChanged { equals_fn_id: FnId },
Pairwise { fn_id: FnId },
Combine { pack_fn: FnId },
WithLatestFrom { pack_fn: FnId },
Merge,
Take { count: u32 },
Skip { count: u32 },
TakeWhile { fn_id: FnId },
Last { default: HandleId },
}
#[derive(Copy, Clone, Debug)]
pub struct OperatorOpts {
pub equals: EqualsMode,
pub partial: bool,
}
impl Default for OperatorOpts {
fn default() -> Self {
Self {
equals: EqualsMode::Identity,
partial: false,
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum NodeFnOrOp {
Fn(FnId),
Op(OperatorOp),
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum PausableMode {
#[default]
Default,
ResumeAll,
Off,
}
#[derive(Copy, Clone, Debug)]
pub struct NodeOpts {
pub initial: HandleId,
pub equals: EqualsMode,
pub partial: bool,
pub is_dynamic: bool,
pub pausable: PausableMode,
pub replay_buffer: Option<usize>,
}
impl Default for NodeOpts {
fn default() -> Self {
Self {
initial: NO_HANDLE,
equals: EqualsMode::Identity,
partial: false,
is_dynamic: false,
pausable: PausableMode::Default,
replay_buffer: None,
}
}
}
#[derive(Clone, Debug)]
pub struct NodeRegistration {
pub deps: Vec<NodeId>,
pub fn_or_op: Option<NodeFnOrOp>,
pub opts: NodeOpts,
}
#[derive(Copy, Clone, Debug)]
pub enum EqualsMode {
Identity,
Custom(FnId),
}
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) struct SubscriptionId(u64);
#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
pub struct Subscription {
state: Weak<Mutex<CoreState>>,
node_id: NodeId,
sub_id: SubscriptionId,
}
impl Subscription {
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
}
impl Drop for Subscription {
fn drop(&mut self) {
let Some(state) = self.state.upgrade() else {
return;
};
let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
let mut s = state.lock();
let Some(rec) = s.nodes.get_mut(&self.node_id) else {
return;
};
rec.subscribers.remove(&self.sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
let last = rec.subscribers.is_empty();
let producer = rec.is_producer();
let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
let binding = if last && (producer || user_cleanup || fire_wipe) {
Some(s.binding.clone())
} else {
None
};
(last, producer, user_cleanup, fire_wipe, binding)
};
if was_last_sub {
if let Some(binding) = binding {
if has_user_cleanup {
binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
}
if is_producer {
binding.producer_deactivate(self.node_id);
}
if fire_wipe {
binding.wipe_ctx(self.node_id);
}
}
}
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Subscription>();
};
pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
#[derive(Debug)]
pub(crate) enum PauseState {
Active,
Paused {
locks: SmallVec<[LockId; 2]>,
buffer: VecDeque<Message>,
dropped: u32,
started_at_ns: u64,
overflow_reported: bool,
pending_wave: bool,
},
}
impl PauseState {
pub(crate) fn is_paused(&self) -> bool {
matches!(self, Self::Paused { .. })
}
fn lock_count(&self) -> usize {
match self {
Self::Active => 0,
Self::Paused { locks, .. } => locks.len(),
}
}
fn contains_lock(&self, lock_id: LockId) -> bool {
match self {
Self::Active => false,
Self::Paused { locks, .. } => locks.contains(&lock_id),
}
}
fn add_lock(&mut self, lock_id: LockId) {
match self {
Self::Active => {
let mut locks = SmallVec::new();
locks.push(lock_id);
*self = Self::Paused {
locks,
buffer: VecDeque::new(),
dropped: 0,
started_at_ns: monotonic_ns(),
overflow_reported: false,
pending_wave: false,
};
}
Self::Paused { locks, .. } => {
if !locks.contains(&lock_id) {
locks.push(lock_id);
}
}
}
}
pub(crate) fn mark_pending_wave(&mut self) {
if let Self::Paused { pending_wave, .. } = self {
*pending_wave = true;
}
}
pub(crate) fn take_pending_wave(&mut self) -> bool {
if let Self::Paused { pending_wave, .. } = self {
std::mem::replace(pending_wave, false)
} else {
false
}
}
fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
match self {
Self::Active => None,
Self::Paused { locks, .. } => {
if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
locks.swap_remove(idx);
}
if locks.is_empty() {
let prev = std::mem::replace(self, Self::Active);
if let Self::Paused {
buffer, dropped, ..
} = prev
{
return Some((buffer, dropped));
}
}
None
}
}
}
pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
let mut result = PushBufferedResult::default();
if let Self::Paused {
buffer,
dropped,
overflow_reported,
..
} = self
{
buffer.push_back(msg);
if let Some(c) = cap {
while buffer.len() > c {
if let Some(dropped_msg) = buffer.pop_front() {
result.dropped_msgs.push(dropped_msg);
}
*dropped = dropped.saturating_add(1);
}
}
if !result.dropped_msgs.is_empty() && !*overflow_reported {
*overflow_reported = true;
result.first_overflow_this_cycle = true;
}
}
result
}
pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
match self {
Self::Active => None,
Self::Paused {
dropped,
started_at_ns,
..
} => {
let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
Some((*dropped, lock_held_ns))
}
}
}
}
#[derive(Default)]
pub(crate) struct PushBufferedResult {
pub(crate) dropped_msgs: Vec<Message>,
pub(crate) first_overflow_this_cycle: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct PendingPauseOverflow {
pub(crate) node_id: NodeId,
pub(crate) dropped_count: u32,
pub(crate) configured_max: usize,
pub(crate) lock_held_ns: u64,
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum PauseError {
#[error("pause/resume: unknown node {0:?}")]
UnknownNode(NodeId),
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum UpError {
#[error("up: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"up: tier {tier} is forbidden upstream — value (tier 3) and \
terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
)]
TierForbidden { tier: u8 },
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum RegisterError {
#[error("register: unknown dep {0:?}")]
UnknownDep(NodeId),
#[error(
"register: operator nodes require at least one dep — \
use register_producer for subscription-managed combinators"
)]
OperatorWithoutDeps,
#[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
InitialOnlyForStateNodes,
#[error(
"register: dep {0:?} is terminal and not resubscribable; \
mark it resubscribable before terminating, or remove it from the dep list"
)]
TerminalDep(NodeId),
#[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
OperatorSeedSentinel,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum SetPausableModeError {
#[error("set_pausable_mode: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"set_pausable_mode: cannot change pausable mode while paused; \
resume all locks first"
)]
WhilePaused,
}
pub(crate) struct DepRecord {
pub(crate) node: NodeId,
pub(crate) prev_data: HandleId,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) data_batch: SmallVec<[HandleId; 1]>,
pub(crate) terminal: Option<TerminalKind>,
}
impl DepRecord {
fn new(node: NodeId) -> Self {
Self {
node,
prev_data: NO_HANDLE,
dirty: false,
involved_this_wave: false,
data_batch: SmallVec::new(),
terminal: None,
}
}
}
#[allow(clippy::struct_excessive_bools)]
pub(crate) struct NodeRecord {
pub(crate) dep_records: Vec<DepRecord>,
pub(crate) fn_id: Option<FnId>,
pub(crate) op: Option<OperatorOp>,
pub(crate) is_dynamic: bool,
pub(crate) equals: EqualsMode,
pub(crate) cache: HandleId,
pub(crate) has_fired_once: bool,
pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
pub(crate) subscribers_revision: u64,
pub(crate) tracked: HashSet<usize>,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) pause_state: PauseState,
pub(crate) pausable: PausableMode,
pub(crate) replay_buffer_cap: Option<usize>,
pub(crate) replay_buffer: VecDeque<HandleId>,
pub(crate) terminal: Option<TerminalKind>,
pub(crate) has_received_teardown: bool,
pub(crate) resubscribable: bool,
pub(crate) meta_companions: Vec<NodeId>,
pub(crate) partial: bool,
pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
}
impl NodeRecord {
pub(crate) fn is_state(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
}
pub(crate) fn is_producer(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
}
#[allow(dead_code)] pub(crate) fn is_compute(&self) -> bool {
!self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
}
#[allow(dead_code)] pub(crate) fn is_operator(&self) -> bool {
self.op.is_some()
}
pub(crate) fn skips_auto_cascade(&self) -> bool {
match self.op {
Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
None => false,
}
}
pub(crate) fn kind(&self) -> NodeKind {
if let Some(op) = self.op {
NodeKind::Operator(op)
} else if self.dep_records.is_empty() {
if self.fn_id.is_some() {
NodeKind::Producer
} else {
NodeKind::State
}
} else if self.is_dynamic {
NodeKind::Dynamic
} else {
NodeKind::Derived
}
}
pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
self.dep_records.iter().map(|r| r.node)
}
pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
self.dep_ids().collect()
}
pub(crate) fn dep_count(&self) -> usize {
self.dep_records.len()
}
pub(crate) fn has_sentinel_deps(&self) -> bool {
self.dep_records
.iter()
.any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
}
pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
self.dep_records.iter().position(|r| r.node == dep_id)
}
pub(crate) fn all_deps_terminal(&self) -> bool {
!self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
}
}
pub(crate) struct CrossPartitionState {
pub(crate) deferred_handle_releases: Vec<HandleId>,
pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
pub(crate) pending_auto_resolve: ahash::AHashSet<NodeId>,
pub(crate) pending_pause_overflow: Vec<PendingPauseOverflow>,
binding: Arc<dyn BindingBoundary>,
}
impl CrossPartitionState {
fn new(binding: Arc<dyn BindingBoundary>) -> Self {
Self {
deferred_handle_releases: Vec::new(),
wave_cache_snapshots: HashMap::new(),
pending_auto_resolve: ahash::AHashSet::new(),
pending_pause_overflow: Vec::new(),
binding,
}
}
pub(crate) fn clear_wave_state(&mut self) {
self.pending_auto_resolve.clear();
self.pending_pause_overflow.clear();
}
}
impl Drop for CrossPartitionState {
fn drop(&mut self) {
let snapshots = std::mem::take(&mut self.wave_cache_snapshots);
for (_, h) in snapshots {
self.binding.release_handle(h);
}
let releases = std::mem::take(&mut self.deferred_handle_releases);
for h in releases {
self.binding.release_handle(h);
}
}
}
pub(crate) struct CoreState {
pub(crate) next_node_id: u64,
pub(crate) next_subscription_id: u64,
pub(crate) next_lock_id: u64,
pub(crate) nodes: HashMap<NodeId, NodeRecord>,
pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
pub(crate) pending_fires: HashSet<NodeId>,
pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
pub(crate) in_tick: bool,
pub(crate) pause_buffer_cap: Option<usize>,
pub(crate) max_batch_drain_iterations: u32,
pub(crate) deferred_flush_jobs: crate::batch::DeferredJobs,
pub(crate) binding: Arc<dyn BindingBoundary>,
pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
pub(crate) next_topology_id: u64,
pub(crate) currently_firing: Vec<NodeId>,
pub(crate) invalidate_hooks_fired_this_wave: ahash::AHashSet<NodeId>,
pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
pub(crate) pending_wipes: Vec<NodeId>,
}
#[derive(Clone)]
pub struct Core {
pub(crate) state: Arc<Mutex<CoreState>>,
pub(crate) cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>,
pub(crate) binding: Arc<dyn BindingBoundary>,
pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
}
#[derive(Clone)]
pub struct WeakCore {
state: Weak<Mutex<CoreState>>,
cross_partition: Weak<parking_lot::Mutex<CrossPartitionState>>,
binding: Weak<dyn BindingBoundary>,
registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
}
impl WeakCore {
#[must_use]
pub fn upgrade(&self) -> Option<Core> {
Some(Core {
state: self.state.upgrade()?,
cross_partition: self.cross_partition.upgrade()?,
binding: self.binding.upgrade()?,
registry: self.registry.upgrade()?,
})
}
}
struct ScratchReleaseGuard<'a> {
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
}
impl<'a> ScratchReleaseGuard<'a> {
fn new(
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
) -> Self {
Self { scratch, binding }
}
fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
self.scratch.take()
}
}
impl Drop for ScratchReleaseGuard<'_> {
fn drop(&mut self) {
if let Some(mut scratch) = self.scratch.take() {
scratch.release_handles(self.binding);
}
}
}
impl Core {
#[must_use]
pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
Self {
state: Arc::new(Mutex::new(CoreState {
next_node_id: 1,
next_subscription_id: 1,
next_lock_id: 1u64 << 31,
nodes: HashMap::new(),
children: HashMap::new(),
pending_fires: HashSet::new(),
pending_notify: IndexMap::new(),
in_tick: false,
pause_buffer_cap: None,
max_batch_drain_iterations: 10_000,
deferred_flush_jobs: Vec::new(),
binding: binding.clone(),
topology_sinks: HashMap::new(),
next_topology_id: 1,
currently_firing: Vec::new(),
invalidate_hooks_fired_this_wave: ahash::AHashSet::new(),
deferred_cleanup_hooks: Vec::new(),
pending_wipes: Vec::new(),
})),
cross_partition: Arc::new(parking_lot::Mutex::new(CrossPartitionState::new(
binding.clone(),
))),
binding,
registry: Arc::new(parking_lot::Mutex::new(
crate::subgraph::SubgraphRegistry::new(),
)),
}
}
pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
self.state.lock()
}
pub(crate) fn lock_cross_partition(&self) -> parking_lot::MutexGuard<'_, CrossPartitionState> {
self.cross_partition.lock()
}
#[must_use]
pub fn same_dispatcher(&self, other: &Core) -> bool {
Arc::ptr_eq(&self.state, &other.state)
}
#[must_use]
pub fn weak_handle(&self) -> WeakCore {
WeakCore {
state: Arc::downgrade(&self.state),
cross_partition: Arc::downgrade(&self.cross_partition),
binding: Arc::downgrade(&self.binding),
registry: Arc::downgrade(&self.registry),
}
}
#[must_use]
pub fn partition_count(&self) -> usize {
self.registry.lock().component_count()
}
#[must_use]
pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
self.registry.lock().partition_of(node)
}
pub(crate) fn partition_wave_owner_lock_arc(&self, seed: NodeId) -> WaveOwnerGuard {
struct AcquireGuard {
sid: crate::subgraph::SubgraphId,
consumed: bool,
}
impl AcquireGuard {
fn into_consumed(mut self) {
self.consumed = true;
}
}
impl Drop for AcquireGuard {
fn drop(&mut self) {
if !self.consumed {
held_partitions::release(self.sid);
}
}
}
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let (sid, lock_box) = {
let mut reg = self.registry.lock();
reg.lock_for(seed).expect(
"partition_wave_owner_lock_arc: seed must be registered \
(P12-fix invariant: registry membership is published \
atomically with `s.nodes`)",
)
};
held_partitions::check_and_acquire(sid);
let acquire_guard = AcquireGuard {
sid,
consumed: false,
};
let inner = lock_box.wave_owner.lock_arc();
let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
if still_valid {
acquire_guard.into_consumed();
drop(lock_box);
return WaveOwnerGuard { sid, inner };
}
drop(inner);
drop(acquire_guard);
std::thread::yield_now();
}
panic!(
"partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
— pathological concurrent union/split activity. Mirrors py \
`_MAX_LOCK_RETRIES`.",
crate::subgraph::MAX_LOCK_RETRIES,
seed
);
}
pub(crate) fn compute_touched_partitions(
&self,
seed: NodeId,
) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
let s = self.lock_state();
let mut reg = self.registry.lock();
let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
let mut visited: HashSet<NodeId> = HashSet::default();
let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
stack.push(seed);
while let Some(n) = stack.pop() {
if !visited.insert(n) {
continue;
}
if let Some(p) = reg.partition_of(n) {
if !partitions.contains(&p) {
partitions.push(p);
}
}
if let Some(children) = s.children.get(&n) {
stack.extend(children.iter().copied());
}
if let Some(rec) = s.nodes.get(&n) {
stack.extend(rec.meta_companions.iter().copied());
}
}
partitions.sort_unstable_by_key(|sid| sid.raw());
partitions
}
pub(crate) fn all_partitions_lock_boxes(
&self,
) -> Vec<(
crate::subgraph::SubgraphId,
Arc<crate::subgraph::SubgraphLockBox>,
)> {
self.registry.lock().all_partitions()
}
}
pub(crate) fn walk_undirected_dep_graph(
s: &CoreState,
start: NodeId,
skip_edge: Option<(NodeId, NodeId)>,
extra_edges: &[(NodeId, NodeId)],
) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::default();
let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
queue.push(start);
while let Some(cur) = queue.pop() {
if !visited.insert(cur) {
continue;
}
if let Some(consumers) = s.children.get(&cur) {
for &c in consumers {
let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
if !is_skipped && !visited.contains(&c) {
queue.push(c);
}
}
}
if let Some(rec) = s.nodes.get(&cur) {
for d in rec.dep_records.iter().map(|r| r.node) {
let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
if !is_skipped && !visited.contains(&d) {
queue.push(d);
}
}
}
for &(ep, ec) in extra_edges {
if cur == ep && !visited.contains(&ec) {
queue.push(ec);
}
if cur == ec && !visited.contains(&ep) {
queue.push(ep);
}
}
}
visited
}
impl Core {
#[cfg(any(test, debug_assertions))]
#[must_use]
pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
self.lock_state()
.pending_notify
.get(&node)
.map(|entry| entry.batches.len())
}
pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
self.lock_state().pause_buffer_cap = cap;
}
pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
let cap = match cap {
Some(0) => None,
other => other,
};
let to_release: Vec<HandleId> = {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
rec.replay_buffer_cap = cap;
match cap {
None => rec.replay_buffer.drain(..).collect(),
Some(c) => {
let mut drained = Vec::new();
while rec.replay_buffer.len() > c {
if let Some(h) = rec.replay_buffer.pop_front() {
drained.push(h);
}
}
drained
}
}
};
for h in to_release {
self.binding.release_handle(h);
}
}
pub fn set_pausable_mode(
&self,
node_id: NodeId,
mode: PausableMode,
) -> Result<(), SetPausableModeError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(SetPausableModeError::UnknownNode(node_id))?;
if rec.pause_state.is_paused() {
return Err(SetPausableModeError::WhilePaused);
}
rec.pausable = mode;
Ok(())
}
pub fn set_max_batch_drain_iterations(&self, cap: u32) {
assert!(cap > 0, "max_batch_drain_iterations must be > 0");
self.lock_state().max_batch_drain_iterations = cap;
}
pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
let dep_ids: Vec<NodeId> = {
let s = self.lock_state();
let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
rec.dep_ids_vec()
};
let tier = message.tier();
if tier == 3 || tier == 5 {
return Err(UpError::TierForbidden { tier });
}
for dep_id in dep_ids {
match message {
Message::Pause(lock) => {
let _ = self.pause(dep_id, lock);
}
Message::Resume(lock) => {
let _ = self.resume(dep_id, lock);
}
Message::Invalidate => {
self.invalidate(dep_id);
}
Message::Teardown => {
self.teardown(dep_id);
}
_ => {}
}
}
Ok(())
}
#[must_use]
pub fn alloc_lock_id(&self) -> LockId {
let mut s = self.lock_state();
let id = LockId::new(s.next_lock_id);
s.next_lock_id += 1;
id
}
pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
let NodeRegistration {
deps,
fn_or_op,
opts,
} = reg;
let NodeOpts {
initial,
equals,
partial,
is_dynamic,
pausable,
replay_buffer,
} = opts;
let (fn_id, op) = match fn_or_op {
Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
None => (None, None),
};
let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
if op.is_some() && deps.is_empty() {
return Err(RegisterError::OperatorWithoutDeps);
}
if initial != NO_HANDLE && !is_state_shape {
return Err(RegisterError::InitialOnlyForStateNodes);
}
let scratch = match op {
Some(operator_op) => self.make_op_scratch(operator_op)?,
None => None,
};
let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
let mut s = self.lock_state();
for &dep in &deps {
if !s.nodes.contains_key(&dep) {
return Err(RegisterError::UnknownDep(dep));
}
}
for &dep in &deps {
let dep_rec = s.require_node(dep);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(RegisterError::TerminalDep(dep));
}
}
let installed_scratch = scratch_guard.take();
let id = s.alloc_node_id();
let tracked: HashSet<usize> = if op.is_some() {
(0..deps.len()).collect()
} else if is_dynamic {
HashSet::new()
} else if fn_id.is_some() && !deps.is_empty() {
(0..deps.len()).collect()
} else {
HashSet::new()
};
let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
let rec = NodeRecord {
dep_records,
fn_id,
op,
is_dynamic,
equals,
cache: initial,
has_fired_once: initial != NO_HANDLE,
subscribers: HashMap::new(),
subscribers_revision: 0,
tracked,
dirty: false,
involved_this_wave: false,
pause_state: PauseState::Active,
pausable,
replay_buffer_cap: replay_buffer,
replay_buffer: VecDeque::new(),
terminal: None,
has_received_teardown: false,
resubscribable: false,
meta_companions: Vec::new(),
partial,
op_scratch: installed_scratch,
};
s.nodes.insert(id, rec);
s.children.insert(id, HashSet::new());
for &dep in &deps {
s.children.entry(dep).or_default().insert(id);
}
{
let mut reg = self.registry.lock();
reg.ensure_registered(id);
for &dep in &deps {
reg.union_nodes(id, dep);
}
}
drop(s);
self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
Ok(id)
}
pub fn register_state(
&self,
initial: HandleId,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: None,
opts: NodeOpts {
initial,
partial,
..NodeOpts::default()
},
})
}
pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
partial: true,
..NodeOpts::default()
},
})
}
pub fn register_derived(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
..NodeOpts::default()
},
})
}
pub fn register_dynamic(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
is_dynamic: true,
..NodeOpts::default()
},
})
}
fn make_op_scratch(
&self,
op: OperatorOp,
) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
use crate::op_state::{
DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
TakeWhileState,
};
match op {
OperatorOp::Scan { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ScanState { acc: seed });
self.binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::Reduce { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ReduceState { acc: seed });
self.binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
OperatorOp::Last { default } => {
let state = Box::new(LastState {
latest: NO_HANDLE,
default,
});
if default != NO_HANDLE {
self.binding.retain_handle(default);
}
Ok(Some(state))
}
OperatorOp::Map { .. }
| OperatorOp::Filter { .. }
| OperatorOp::Combine { .. }
| OperatorOp::WithLatestFrom { .. }
| OperatorOp::Merge => Ok(None),
}
}
pub fn register_operator(
&self,
deps: &[NodeId],
op: OperatorOp,
opts: OperatorOpts,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Op(op)),
opts: NodeOpts {
equals: opts.equals,
partial: opts.partial,
..NodeOpts::default()
},
})
}
#[allow(clippy::needless_pass_by_value)] pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
let _wave_guard = self.partition_wave_owner_lock_arc(node_id);
let (sub_id, tier_slices, needs_activation, did_reset) = {
let mut s = self.lock_state();
let sub_id = s.alloc_sub_id();
let needs_reset = {
let rec = s.require_node(node_id);
rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
};
if needs_reset {
self.reset_for_fresh_lifecycle(&mut s, node_id);
}
let (cache, is_state, first_subscriber, terminal, torn_down) = {
let rec = s.require_node(node_id);
(
rec.cache,
rec.is_state(),
rec.subscribers.is_empty(),
rec.terminal,
rec.has_received_teardown,
)
};
let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
tier_slices.push(vec![Message::Start]);
if cache != NO_HANDLE {
tier_slices.push(vec![Message::Data(cache)]);
}
let replay_handles: Vec<HandleId> = {
let rec = s.require_node(node_id);
let cap = rec.replay_buffer_cap.unwrap_or(0);
if cap == 0 {
Vec::new()
} else {
let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
if cache != NO_HANDLE && v.last() == Some(&cache) {
v.pop();
}
v
}
};
for h in &replay_handles {
tier_slices.push(vec![Message::Data(*h)]);
}
if let Some(t) = terminal {
tier_slices.push(vec![match t {
TerminalKind::Complete => Message::Complete,
TerminalKind::Error(h) => Message::Error(h),
}]);
}
if torn_down {
tier_slices.push(vec![Message::Teardown]);
}
{
let rec = s.require_node_mut(node_id);
rec.subscribers.insert(sub_id, sink.clone());
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
let needs_activation = first_subscriber && !is_state;
(sub_id, tier_slices, needs_activation, needs_reset)
};
if did_reset {
self.binding.wipe_ctx(node_id);
}
for slice in &tier_slices {
let sink_clone = sink.clone();
let slice_ref: &[Message] = slice;
let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
if let Err(panic_payload) = result {
{
let mut s = self.lock_state();
if let Some(rec) = s.nodes.get_mut(&node_id) {
rec.subscribers.remove(&sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
}
std::panic::resume_unwind(panic_payload);
}
}
if needs_activation {
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.activate_derived(&mut s, node_id);
});
}
Subscription {
state: Arc::downgrade(&self.state),
node_id,
sub_id,
}
}
pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
assert!(
rec.subscribers.is_empty(),
"set_resubscribable: node already has subscribers; \
configure resubscribable before any subscribe call"
);
rec.resubscribable = resubscribable;
}
fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
let rec = s.require_node_mut(node_id);
let mut hs = Vec::new();
if let Some(TerminalKind::Error(h)) = rec.terminal {
hs.push(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
hs.push(h);
}
for &h in &dr.data_batch {
hs.push(h);
}
if dr.prev_data != NO_HANDLE {
hs.push(dr.prev_data);
}
}
let mut pulled = Vec::new();
if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
for msg in buffer.drain(..) {
if let Some(h) = msg.payload_handle() {
pulled.push(h);
}
}
}
for h in rec.replay_buffer.drain(..) {
pulled.push(h);
}
rec.terminal = None;
rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
rec.has_received_teardown = false;
for dr in &mut rec.dep_records {
dr.prev_data = NO_HANDLE;
dr.data_batch.clear();
dr.terminal = None;
dr.dirty = false;
dr.involved_this_wave = false;
}
rec.pause_state = PauseState::Active;
rec.involved_this_wave = false;
rec.dirty = false;
if rec.is_dynamic {
rec.tracked.clear();
}
let prev_op = rec.op;
let old = std::mem::take(&mut rec.op_scratch);
(prev_op, old, hs, pulled)
};
let new_scratch = match prev_op {
Some(op) => self
.make_op_scratch(op)
.expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
None => None,
};
if let Some(scratch) = old_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
drop(old_scratch);
{
let rec = s.require_node_mut(node_id);
rec.op_scratch = new_scratch;
}
for h in handles_to_release {
self.binding.release_handle(h);
}
for h in pause_buffer_payloads {
self.binding.release_handle(h);
}
}
pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut order: Vec<NodeId> = Vec::new();
let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
while let Some((id, finalize)) = stack.pop() {
if finalize {
order.push(id);
continue;
}
if !visited.insert(id) {
continue;
}
stack.push((id, true));
let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
for dep_id in dep_ids {
let (dep_is_state, dep_cache, dep_has_fired) = {
let dep_rec = s.require_node(dep_id);
(dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
};
if !dep_is_state
&& dep_cache == NO_HANDLE
&& !dep_has_fired
&& !visited.contains(&dep_id)
{
stack.push((dep_id, false));
}
}
}
for &id in &order {
let (dep_ids, is_producer) = {
let rec = s.require_node(id);
(rec.dep_ids_vec(), rec.is_producer())
};
if is_producer {
s.pending_fires.insert(id);
continue;
}
for (i, dep_id) in dep_ids.iter().copied().enumerate() {
let dep_cache = s.require_node(dep_id).cache;
if dep_cache != NO_HANDLE {
self.deliver_data_to_consumer(s, id, i, dep_cache);
}
}
}
}
pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4)"
);
{
let s = self.lock_state();
let rec = s.require_node(node_id);
assert!(
rec.is_state() || rec.is_producer(),
"emit() is for state or producer nodes only; \
derived/dynamic/operator emit via their fn return value"
);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return;
}
}
self.run_wave_for(node_id, |this| {
this.commit_emission(node_id, new_handle);
});
}
#[must_use]
pub fn cache_of(&self, node_id: NodeId) -> HandleId {
self.lock_state().require_node(node_id).cache
}
#[must_use]
pub fn has_fired_once(&self, node_id: NodeId) -> bool {
self.lock_state().require_node(node_id).has_fired_once
}
#[must_use]
pub fn node_ids(&self) -> Vec<NodeId> {
self.lock_state().nodes.keys().copied().collect()
}
#[must_use]
pub fn node_count(&self) -> usize {
self.lock_state().nodes.len()
}
#[must_use]
pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
}
#[must_use]
pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&node_id)
.map(NodeRecord::dep_ids_vec)
.unwrap_or_default()
}
#[must_use]
pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
self.lock_state()
.nodes
.get(&node_id)
.and_then(|r| r.terminal)
}
#[must_use]
pub fn is_dirty(&self, node_id: NodeId) -> bool {
self.lock_state()
.nodes
.get(&node_id)
.is_some_and(|r| r.dirty)
}
#[must_use]
pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&parent)
.map(|r| r.meta_companions.clone())
.unwrap_or_default()
}
}
impl Core {
pub fn complete(&self, node_id: NodeId) {
self.emit_terminal(node_id, TerminalKind::Complete);
}
pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
assert!(
error_handle != NO_HANDLE,
"NO_HANDLE is not a valid ERROR payload (R1.2.5)"
);
self.emit_terminal(node_id, TerminalKind::Error(error_handle));
self.binding.release_handle(error_handle);
}
fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.terminate_node(&mut s, node_id, terminal);
});
}
fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
while let Some((id, t)) = work.pop() {
if s.require_node(id).terminal.is_some() {
continue; }
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let queue_wipe = {
let rec = s.require_node(id);
rec.resubscribable && rec.subscribers.is_empty()
};
s.require_node_mut(id).terminal = Some(t);
if queue_wipe {
s.pending_wipes.push(id);
}
s.pending_fires.remove(&id);
let drained: Vec<HandleId> = {
let rec = s.require_node_mut(id);
let mut drained: Vec<HandleId> = Vec::new();
if rec.pause_state.is_paused() {
let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
if let PauseState::Paused { buffer, .. } = prev {
drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
}
}
drained.extend(rec.replay_buffer.drain(..));
drained
};
for h in drained {
self.binding.release_handle(h);
}
let msg = match t {
TerminalKind::Complete => Message::Complete,
TerminalKind::Error(h) => Message::Error(h),
};
self.queue_notify(s, id, msg);
let child_ids: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(id);
let Some(idx) = dep_idx else { continue };
{
let child = s.require_node_mut(child_id);
if child.dep_records[idx].terminal.is_some() {
continue;
}
child.dep_records[idx].terminal = Some(t);
}
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let action = {
let child = s.require_node(child_id);
if child.terminal.is_some() {
ChildAction::None } else if child.all_deps_terminal() {
if child.skips_auto_cascade() {
ChildAction::QueueFire
} else {
ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
}
} else {
ChildAction::None
}
};
match action {
ChildAction::None => {}
ChildAction::Cascade(t_child) => {
work.push((child_id, t_child));
}
ChildAction::QueueFire => {
s.pending_fires.insert(child_id);
}
}
}
}
}
}
enum ChildAction {
None,
Cascade(TerminalKind),
QueueFire,
}
fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
for dr in dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
return TerminalKind::Error(h);
}
}
TerminalKind::Complete
}
impl Core {
pub fn teardown(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
let torn_down_for_wave = torn_down.clone();
self.run_wave_for(node_id, move |this| {
let mut s = this.lock_state();
let collected = this.teardown_inner(&mut s, node_id);
torn_down_for_wave.lock().extend(collected);
});
let ids = std::mem::take(&mut *torn_down.lock());
for id in ids {
self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
}
}
fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
enum Action {
Visit(NodeId),
EmitTeardown(NodeId),
}
let mut stack: Vec<Action> = vec![Action::Visit(root)];
let mut torn_down: Vec<NodeId> = Vec::new();
while let Some(action) = stack.pop() {
match action {
Action::Visit(id) => {
if s.require_node(id).has_received_teardown {
continue; }
s.require_node_mut(id).has_received_teardown = true;
let children: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for &child in children.iter().rev() {
stack.push(Action::Visit(child));
}
stack.push(Action::EmitTeardown(id));
let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
for &meta in metas.iter().rev() {
stack.push(Action::Visit(meta));
}
}
Action::EmitTeardown(id) => {
let already_terminal = s.require_node(id).terminal.is_some();
if !already_terminal {
self.terminate_node(s, id, TerminalKind::Complete);
}
self.queue_notify(s, id, Message::Teardown);
torn_down.push(id);
}
}
}
torn_down
}
pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
assert!(parent != companion, "node cannot be its own meta companion");
let mut s = self.lock_state();
assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
assert!(
s.nodes.contains_key(&companion),
"unknown companion {companion:?}"
);
let metas = &mut s.require_node_mut(parent).meta_companions;
if !metas.contains(&companion) {
metas.push(companion);
}
}
}
impl Core {
pub fn invalidate(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.invalidate_inner(&mut s, node_id);
});
}
fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
let mut work: Vec<NodeId> = vec![root];
while let Some(node_id) = work.pop() {
let old_handle = s.require_node(node_id).cache;
if old_handle == NO_HANDLE {
continue;
}
s.require_node_mut(node_id).cache = NO_HANDLE;
self.binding.release_handle(old_handle);
if s.invalidate_hooks_fired_this_wave.insert(node_id) {
s.deferred_cleanup_hooks
.push((node_id, CleanupTrigger::OnInvalidate));
}
self.queue_notify(s, node_id, Message::Invalidate);
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
let dr = &s.require_node(child_id).dep_records[idx];
(dr.prev_data, dr.data_batch.clone())
};
{
let mut cps = self.cross_partition.lock();
if old_prev != NO_HANDLE {
cps.deferred_handle_releases.push(old_prev);
}
for h in batch_hs {
cps.deferred_handle_releases.push(h);
}
}
let dr = &mut s.require_node_mut(child_id).dep_records[idx];
dr.prev_data = NO_HANDLE;
dr.data_batch.clear();
work.push(child_id);
}
}
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ResumeReport {
pub replayed: u32,
pub dropped: u32,
}
impl Core {
pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.terminal.is_some() {
return Ok(());
}
if rec.pausable == PausableMode::Off {
return Ok(());
}
rec.pause_state.add_lock(lock_id);
Ok(())
}
pub fn resume(
&self,
node_id: NodeId,
lock_id: LockId,
) -> Result<Option<ResumeReport>, PauseError> {
let (sinks, messages, dropped, pending_wave_for_default) = {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.pausable == PausableMode::Off {
return Ok(None);
}
let was_default_mode = rec.pausable == PausableMode::Default;
let pending_wave = if was_default_mode {
rec.pause_state.take_pending_wave()
} else {
false
};
let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
if pending_wave {
rec.pause_state.mark_pending_wave();
}
return Ok(None);
};
let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
let messages: Vec<Message> = buffer.into_iter().collect();
if pending_wave && was_default_mode {
s.pending_fires.insert(node_id);
}
(sinks, messages, dropped, pending_wave && was_default_mode)
};
let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
if !messages.is_empty() {
for sink in &sinks {
sink(&messages);
}
for msg in &messages {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
if pending_wave_for_default {
self.run_wave_for(node_id, |_this| {
});
}
Ok(Some(ResumeReport { replayed, dropped }))
}
#[must_use]
pub fn is_paused(&self, node_id: NodeId) -> bool {
self.state
.lock()
.require_node(node_id)
.pause_state
.is_paused()
}
#[must_use]
pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
self.state
.lock()
.require_node(node_id)
.pause_state
.lock_count()
}
#[must_use]
pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
self.state
.lock()
.require_node(node_id)
.pause_state
.contains_lock(lock_id)
}
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum SetDepsError {
#[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
SelfDependency { n: NodeId },
#[error(
"set_deps({n:?}, ...): cycle would form via path {path:?} \
(adding {added_dep:?} → {n:?} closes the loop)"
)]
WouldCreateCycle {
n: NodeId,
added_dep: NodeId,
path: Vec<NodeId>,
},
#[error("set_deps: unknown node {0:?}")]
UnknownNode(NodeId),
#[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
NotComputeNode(NodeId),
#[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
TerminalNode { n: NodeId },
#[error(
"set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
either mark it resubscribable before terminate, or remove the dep from new_deps"
)]
TerminalDep { n: NodeId, dep: NodeId },
#[error(
"set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
(set_deps from inside the firing node's own fn would corrupt the \
Dynamic `tracked` indices snapshot taken before invoke_fn). \
Schedule the rewire outside this fire scope."
)]
ReentrantOnFiringNode { n: NodeId },
#[error(
"set_deps({n:?}, ...): rejected — would migrate the partition of \
currently-firing node {firing:?} mid-wave (union/split during \
fire would invalidate the held wave_owner Arc). Schedule the \
rewire outside the wave."
)]
PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
}
impl Core {
#[allow(clippy::too_many_lines)]
pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
let mut s = self.lock_state();
let (is_state, is_producer, is_terminal) = {
let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
(rec.is_state(), rec.is_producer(), rec.terminal.is_some())
};
if is_state || is_producer {
return Err(SetDepsError::NotComputeNode(n));
}
if is_terminal {
return Err(SetDepsError::TerminalNode { n });
}
if s.currently_firing.contains(&n) {
return Err(SetDepsError::ReentrantOnFiringNode { n });
}
if new_deps.contains(&n) {
return Err(SetDepsError::SelfDependency { n });
}
for &d in new_deps {
if !s.nodes.contains_key(&d) {
return Err(SetDepsError::UnknownNode(d));
}
}
let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
for &d in &added {
if let Some(path) = self.path_from_to(&s, n, d) {
return Err(SetDepsError::WouldCreateCycle {
n,
added_dep: d,
path,
});
}
}
for &d in &added {
let dep_rec = s.require_node(d);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(SetDepsError::TerminalDep { n, dep: d });
}
}
let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
if !s.currently_firing.is_empty() && (!added.is_empty() || !removed.is_empty()) {
let mut reg = self.registry.lock();
let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> = s
.currently_firing
.iter()
.filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
.collect();
if !firing_with_partition.is_empty() {
let part_n = reg.partition_of(n);
for &added_dep in &added {
let part_added = reg.partition_of(added_dep);
if part_n == part_added {
continue; }
let affected = [part_n, part_added];
if let Some(&(firing, _)) = firing_with_partition
.iter()
.find(|(_, p)| affected.contains(&Some(*p)))
{
return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
}
}
let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
for &removed_dep in &removed {
let part_removed = reg.partition_of(removed_dep);
let visited = walk_undirected_dep_graph(
&s,
removed_dep,
Some((removed_dep, n)),
&added_edges,
);
let would_disconnect = !visited.contains(&n);
if would_disconnect {
if let Some(&(firing, _)) = firing_with_partition
.iter()
.find(|(_, p)| Some(*p) == part_removed)
{
return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
}
}
}
}
}
if added.is_empty() && removed.is_empty() {
return Ok(());
}
let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
let rec = s.require_node(n);
let old_by_node: HashMap<NodeId, &DepRecord> =
rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
let new_records: Vec<DepRecord> = new_deps_vec
.iter()
.map(|&d| {
if let Some(old) = old_by_node.get(&d) {
DepRecord {
node: d,
prev_data: old.prev_data,
dirty: old.dirty,
involved_this_wave: old.involved_this_wave,
data_batch: old.data_batch.clone(),
terminal: old.terminal,
}
} else {
DepRecord::new(d)
}
})
.collect();
let mut to_release: Vec<HandleId> = Vec::new();
for d in &removed {
if let Some(old) = old_by_node.get(d) {
if let Some(TerminalKind::Error(h)) = old.terminal {
to_release.push(h);
}
for &h in &old.data_batch {
to_release.push(h);
}
}
}
(new_records, to_release)
};
let fire_set_deps_on_rerun;
{
let rec = s.require_node_mut(n);
fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
rec.dep_records = new_dep_records;
if rec.is_dynamic {
rec.tracked.clear();
rec.has_fired_once = false;
} else {
rec.tracked = (0..new_deps_vec.len()).collect();
}
}
for &removed_dep in &removed {
if let Some(set) = s.children.get_mut(&removed_dep) {
set.remove(&n);
}
}
for &added_dep in &added {
s.children.entry(added_dep).or_default().insert(n);
}
let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
{
let mut reg = self.registry.lock();
for &added_dep in &added {
reg.union_nodes(n, added_dep);
}
for &removed_dep in &removed {
let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
if visited.contains(&n) {
continue;
}
let original_root = reg.find(removed_dep);
let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
let component_nodes: Vec<NodeId> = snapshot_keys
.into_iter()
.filter(|&node| reg.find(node) == original_root)
.collect();
let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
for &node in &component_nodes {
if let Some(rec) = s.nodes.get(&node) {
for d in rec.dep_records.iter().map(|r| r.node) {
if component_set.contains(&d) {
edges_in_component.push((d, node));
}
}
}
}
let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
reg.on_edge_removed(n, removed_dep);
}
}
drop(s);
if fire_set_deps_on_rerun {
self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
}
self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
node: n,
old_deps: old_deps_vec,
new_deps: new_deps_vec.clone(),
});
if !added_for_wave.is_empty() {
self.run_wave_for(n, |this| {
let mut s = this.lock_state();
if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
return;
}
for added_dep in &added_for_wave {
let cache = match s.nodes.get(added_dep) {
Some(rec) => rec.cache,
None => continue, };
if cache == NO_HANDLE {
continue;
}
let dep_idx = s.require_node(n).dep_index_of(*added_dep);
if let Some(idx) = dep_idx {
this.deliver_data_to_consumer(&mut s, n, idx, cache);
}
}
});
}
for h in removed_handles {
self.binding.release_handle(h);
}
Ok(())
}
fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
if from == to {
return Some(vec![from]);
}
let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
let mut visited: HashSet<NodeId> = HashSet::new();
while let Some((cur, path)) = stack.pop() {
if !visited.insert(cur) {
continue;
}
if cur == to {
return Some(path);
}
if let Some(children) = s.children.get(&cur) {
for &child in children {
let mut new_path = path.clone();
new_path.push(child);
stack.push((child, new_path));
}
}
}
None
}
}
impl CoreState {
fn alloc_node_id(&mut self) -> NodeId {
let id = NodeId::new(self.next_node_id);
self.next_node_id += 1;
id
}
fn alloc_sub_id(&mut self) -> SubscriptionId {
let id = SubscriptionId(self.next_subscription_id);
self.next_subscription_id += 1;
id
}
pub(crate) fn clear_wave_state(&mut self, cps: &mut CrossPartitionState) {
self.currently_firing.clear();
self.invalidate_hooks_fired_this_wave.clear();
for rec in self.nodes.values_mut() {
rec.dirty = false;
rec.involved_this_wave = false;
for dr in &mut rec.dep_records {
let batch_len = dr.data_batch.len();
if batch_len > 0 {
for &h in &dr.data_batch[..batch_len - 1] {
cps.deferred_handle_releases.push(h);
}
if dr.prev_data != NO_HANDLE {
cps.deferred_handle_releases.push(dr.prev_data);
}
dr.prev_data = dr.data_batch[batch_len - 1];
dr.data_batch.clear();
}
dr.dirty = false;
dr.involved_this_wave = false;
}
}
}
pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
self.nodes
.get(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
self.nodes
.get_mut(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
}
impl Drop for CoreState {
fn drop(&mut self) {
let pending = std::mem::take(&mut self.pending_notify);
let _ = std::mem::take(&mut self.deferred_flush_jobs);
for rec in self.nodes.values_mut() {
if rec.cache != NO_HANDLE {
self.binding.release_handle(rec.cache);
}
if let Some(TerminalKind::Error(h)) = rec.terminal {
self.binding.release_handle(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
self.binding.release_handle(h);
}
for &h in &dr.data_batch {
self.binding.release_handle(h);
}
if dr.prev_data != NO_HANDLE {
self.binding.release_handle(dr.prev_data);
}
}
if let PauseState::Paused { buffer, .. } = &rec.pause_state {
for msg in buffer {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
for &h in &rec.replay_buffer {
self.binding.release_handle(h);
}
if let Some(scratch) = rec.op_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
}
for entry in pending.values() {
for msg in entry.iter_messages() {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
}
}