use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
pub(crate) struct WaveOwnerGuard {
sid: crate::subgraph::SubgraphId,
#[allow(dead_code)]
inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
}
impl Drop for WaveOwnerGuard {
fn drop(&mut self) {
let _ = held_partitions::release(self.sid);
}
}
mod held_partitions {
use crate::subgraph::SubgraphId;
use smallvec::SmallVec;
use std::cell::RefCell;
const HELD_INLINE: usize = 4;
thread_local! {
static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
= const { RefCell::new(SmallVec::new_const()) };
}
pub(crate) fn check_and_acquire(sid: SubgraphId) -> Result<(), super::PartitionOrderViolation> {
HELD.with(|h| {
let mut held = h.borrow_mut();
let already_held = held.iter().any(|(s, _)| *s == sid);
if !held.is_empty() && !already_held {
if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
if sid <= max_held {
let new_id = sid;
drop(held);
return Err(super::PartitionOrderViolation {
attempted: new_id,
max_held,
});
}
}
}
if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
*count = count.checked_add(1).expect(
"held_partitions refcount overflow — unbounded \
same-partition re-entrance. Should be bounded by the \
protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
);
} else {
held.push((sid, 1));
}
Ok(())
})
}
pub(crate) fn release(sid: SubgraphId) -> bool {
HELD.with(|h| {
let mut held = h.borrow_mut();
if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
let count = &mut held[idx].1;
debug_assert!(
*count > 0,
"held_partitions::release({sid:?}): refcount underflow — \
release without matching check_and_acquire (caller bug)"
);
*count = count.saturating_sub(1);
if *count == 0 {
held.swap_remove(idx);
return true;
}
} else {
debug_assert!(
false,
"held_partitions::release({sid:?}): sid not in HELD — \
double-drop or stray release (caller bug)"
);
}
false
})
}
pub(crate) fn any_held() -> bool {
HELD.with(|h| !h.borrow().is_empty())
}
#[cfg(any(test, debug_assertions))]
#[must_use]
pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
v.sort_unstable_by_key(|(s, _)| *s);
v
}
}
#[cfg(any(test, debug_assertions))]
pub use held_partitions::held_snapshot_for_tests;
use smallvec::SmallVec;
use thiserror::Error;
use crate::boundary::{BindingBoundary, CleanupTrigger};
use crate::clock::monotonic_ns;
use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
use crate::message::Message;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TerminalKind {
Complete,
Error(HandleId),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum NodeKind {
State,
Producer,
Derived,
Dynamic,
Operator(OperatorOp),
}
impl NodeKind {
pub(crate) fn skips_auto_cascade(self) -> bool {
matches!(
self,
NodeKind::Operator(
OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
)
)
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum OperatorOp {
Map { fn_id: FnId },
Filter { fn_id: FnId },
Scan { fn_id: FnId, seed: HandleId },
Reduce { fn_id: FnId, seed: HandleId },
DistinctUntilChanged { equals_fn_id: FnId },
Pairwise { fn_id: FnId },
Combine { pack_fn: FnId },
WithLatestFrom { pack_fn: FnId },
Merge,
Take { count: u32 },
Skip { count: u32 },
TakeWhile { fn_id: FnId },
Last { default: HandleId },
Tap { fn_id: FnId },
TapFirst { fn_id: FnId },
Valve,
Settle {
quiet_waves: u32,
max_waves: Option<u32>,
},
}
#[derive(Copy, Clone, Debug)]
pub struct OperatorOpts {
pub equals: EqualsMode,
pub partial: bool,
}
impl Default for OperatorOpts {
fn default() -> Self {
Self {
equals: EqualsMode::Identity,
partial: false,
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum NodeFnOrOp {
Fn(FnId),
Op(OperatorOp),
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum PausableMode {
#[default]
Default,
ResumeAll,
Off,
}
#[derive(Copy, Clone, Debug)]
pub struct NodeOpts {
pub initial: HandleId,
pub equals: EqualsMode,
pub partial: bool,
pub is_dynamic: bool,
pub pausable: PausableMode,
pub replay_buffer: Option<usize>,
}
impl Default for NodeOpts {
fn default() -> Self {
Self {
initial: NO_HANDLE,
equals: EqualsMode::Identity,
partial: false,
is_dynamic: false,
pausable: PausableMode::Default,
replay_buffer: None,
}
}
}
#[derive(Clone, Debug)]
pub struct NodeRegistration {
pub deps: Vec<NodeId>,
pub fn_or_op: Option<NodeFnOrOp>,
pub opts: NodeOpts,
}
#[derive(Copy, Clone, Debug)]
pub enum EqualsMode {
Identity,
Custom(FnId),
}
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) struct SubscriptionId(u64);
#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
pub struct Subscription {
state: Weak<Mutex<CoreState>>,
node_id: NodeId,
sub_id: SubscriptionId,
}
impl Subscription {
#[must_use]
pub fn node_id(&self) -> NodeId {
self.node_id
}
}
impl Drop for Subscription {
#[allow(clippy::too_many_lines)] fn drop(&mut self) {
let Some(state) = self.state.upgrade() else {
return;
};
let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
let mut s = state.lock();
let Some(rec) = s.nodes.get_mut(&self.node_id) else {
return;
};
rec.subscribers.remove(&self.sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
let last = rec.subscribers.is_empty();
let producer = rec.is_producer();
let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
let binding = if last { Some(s.binding.clone()) } else { None };
(last, producer, user_cleanup, fire_wipe, binding)
};
if was_last_sub {
if let Some(binding) = binding {
if has_user_cleanup {
binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
}
if is_producer {
binding.producer_deactivate(self.node_id);
}
if fire_wipe {
binding.wipe_ctx(self.node_id);
}
let (to_release, scratch_to_release): (
Vec<HandleId>,
Option<Box<dyn crate::op_state::OperatorScratch>>,
) = {
let mut s = state.lock();
if let Some(rec) = s.nodes.get_mut(&self.node_id) {
if !rec.subscribers.is_empty() {
return;
}
let mut handles: Vec<HandleId> = Vec::new();
for dr in &mut rec.dep_records {
if dr.prev_data != NO_HANDLE {
handles.push(dr.prev_data);
dr.prev_data = NO_HANDLE;
}
for h in dr.data_batch.drain(..) {
handles.push(h);
}
if let Some(TerminalKind::Error(h)) = dr.terminal {
handles.push(h);
}
dr.terminal = None;
dr.dirty = false;
dr.involved_this_wave = false;
}
if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
for msg in buffer.drain(..) {
if let Some(h) = msg.payload_handle() {
handles.push(h);
}
}
}
for h in rec.replay_buffer.drain(..) {
handles.push(h);
}
rec.pause_state = PauseState::Active;
let is_compute = rec.fn_id.is_some() || rec.op.is_some();
if is_compute && rec.cache != NO_HANDLE {
handles.push(rec.cache);
rec.cache = NO_HANDLE;
}
rec.has_fired_once = false;
rec.dirty = false;
rec.involved_this_wave = false;
rec.received_mask = 0;
rec.involved_mask = 0;
if rec.is_dynamic {
rec.tracked.clear();
}
let scratch = if !rec.resubscribable {
std::mem::take(&mut rec.op_scratch)
} else if let Some(op) = rec.op {
let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
.expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
if let Some(old_box) = old {
s.pending_scratch_release.push(old_box);
}
None
} else {
None
};
(handles, scratch)
} else {
(Vec::new(), None)
}
};
for h in to_release {
binding.release_handle(h);
}
if let Some(mut scratch) = scratch_to_release {
scratch.release_handles(&*binding);
}
}
}
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Subscription>();
};
pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
#[derive(Debug)]
pub(crate) enum PauseState {
Active,
Paused {
locks: SmallVec<[LockId; 2]>,
buffer: VecDeque<Message>,
dropped: u32,
started_at_ns: u64,
overflow_reported: bool,
pending_wave: bool,
},
}
impl PauseState {
pub(crate) fn is_paused(&self) -> bool {
matches!(self, Self::Paused { .. })
}
fn lock_count(&self) -> usize {
match self {
Self::Active => 0,
Self::Paused { locks, .. } => locks.len(),
}
}
fn contains_lock(&self, lock_id: LockId) -> bool {
match self {
Self::Active => false,
Self::Paused { locks, .. } => locks.contains(&lock_id),
}
}
fn add_lock(&mut self, lock_id: LockId) {
match self {
Self::Active => {
let mut locks = SmallVec::new();
locks.push(lock_id);
*self = Self::Paused {
locks,
buffer: VecDeque::new(),
dropped: 0,
started_at_ns: monotonic_ns(),
overflow_reported: false,
pending_wave: false,
};
}
Self::Paused { locks, .. } => {
if !locks.contains(&lock_id) {
locks.push(lock_id);
}
}
}
}
pub(crate) fn mark_pending_wave(&mut self) {
if let Self::Paused { pending_wave, .. } = self {
*pending_wave = true;
}
}
pub(crate) fn take_pending_wave(&mut self) -> bool {
if let Self::Paused { pending_wave, .. } = self {
std::mem::replace(pending_wave, false)
} else {
false
}
}
fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
match self {
Self::Active => None,
Self::Paused { locks, .. } => {
if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
locks.swap_remove(idx);
}
if locks.is_empty() {
let prev = std::mem::replace(self, Self::Active);
if let Self::Paused {
buffer, dropped, ..
} = prev
{
return Some((buffer, dropped));
}
}
None
}
}
}
pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
let mut result = PushBufferedResult::default();
if let Self::Paused {
buffer,
dropped,
overflow_reported,
..
} = self
{
buffer.push_back(msg);
if let Some(c) = cap {
while buffer.len() > c {
if let Some(dropped_msg) = buffer.pop_front() {
result.dropped_msgs.push(dropped_msg);
}
*dropped = dropped.saturating_add(1);
}
}
if !result.dropped_msgs.is_empty() && !*overflow_reported {
*overflow_reported = true;
result.first_overflow_this_cycle = true;
}
}
result
}
pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
match self {
Self::Active => None,
Self::Paused {
dropped,
started_at_ns,
..
} => {
let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
Some((*dropped, lock_held_ns))
}
}
}
}
#[derive(Default)]
pub(crate) struct PushBufferedResult {
pub(crate) dropped_msgs: Vec<Message>,
pub(crate) first_overflow_this_cycle: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct PendingPauseOverflow {
pub(crate) node_id: NodeId,
pub(crate) dropped_count: u32,
pub(crate) configured_max: usize,
pub(crate) lock_held_ns: u64,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[error(
"Phase H+ ascending-order violation: attempted partition {attempted:?} \
while already holding partition {max_held:?} — defer to wave-end"
)]
pub struct PartitionOrderViolation {
pub attempted: crate::subgraph::SubgraphId,
pub max_held: crate::subgraph::SubgraphId,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum SubscribeError {
#[error(transparent)]
PartitionOrderViolation(#[from] PartitionOrderViolation),
#[error(
"subscribe({node:?}): node is non-resubscribable and has terminated; \
the stream is permanently over (R2.2.7.b)"
)]
TornDown {
node: NodeId,
},
}
pub enum DeferredProducerOp {
Emit { node_id: NodeId, handle: HandleId },
Complete { node_id: NodeId },
Error { node_id: NodeId, handle: HandleId },
Callback(Box<dyn FnOnce() + Send>),
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum PauseError {
#[error("pause/resume: unknown node {0:?}")]
UnknownNode(NodeId),
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum UpError {
#[error("up: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"up: tier {tier} is forbidden upstream — value (tier 3) and \
terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
)]
TierForbidden { tier: u8 },
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum RegisterError {
#[error("register: unknown dep {0:?}")]
UnknownDep(NodeId),
#[error(
"register: operator nodes require at least one dep — \
use register_producer for subscription-managed combinators"
)]
OperatorWithoutDeps,
#[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
InitialOnlyForStateNodes,
#[error(
"register: dep {0:?} is terminal and not resubscribable; \
mark it resubscribable before terminating, or remove it from the dep list"
)]
TerminalDep(NodeId),
#[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
OperatorSeedSentinel,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum SetPausableModeError {
#[error("set_pausable_mode: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"set_pausable_mode: cannot change pausable mode while paused; \
resume all locks first"
)]
WhilePaused,
}
pub(crate) struct DepRecord {
pub(crate) node: NodeId,
pub(crate) prev_data: HandleId,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) data_batch: SmallVec<[HandleId; 1]>,
pub(crate) terminal: Option<TerminalKind>,
}
impl DepRecord {
fn new(node: NodeId) -> Self {
Self {
node,
prev_data: NO_HANDLE,
dirty: false,
involved_this_wave: false,
data_batch: SmallVec::new(),
terminal: None,
}
}
}
#[allow(clippy::struct_excessive_bools)]
pub(crate) struct NodeRecord {
pub(crate) dep_records: Vec<DepRecord>,
pub(crate) fn_id: Option<FnId>,
pub(crate) op: Option<OperatorOp>,
pub(crate) is_dynamic: bool,
pub(crate) equals: EqualsMode,
pub(crate) cache: HandleId,
pub(crate) has_fired_once: bool,
pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
pub(crate) subscribers_revision: u64,
pub(crate) tracked: HashSet<usize>,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) pause_state: PauseState,
pub(crate) pausable: PausableMode,
pub(crate) replay_buffer_cap: Option<usize>,
pub(crate) replay_buffer: VecDeque<HandleId>,
pub(crate) terminal: Option<TerminalKind>,
pub(crate) has_received_teardown: bool,
pub(crate) resubscribable: bool,
pub(crate) meta_companions: Vec<NodeId>,
pub(crate) partial: bool,
pub(crate) topo_rank: u32,
pub(crate) received_mask: u64,
pub(crate) involved_mask: u64,
pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
}
impl NodeRecord {
pub(crate) fn is_state(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
}
pub(crate) fn is_producer(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
}
#[allow(dead_code)] pub(crate) fn is_compute(&self) -> bool {
!self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
}
#[allow(dead_code)] pub(crate) fn is_operator(&self) -> bool {
self.op.is_some()
}
pub(crate) fn skips_auto_cascade(&self) -> bool {
match self.op {
Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
None => false,
}
}
pub(crate) fn kind(&self) -> NodeKind {
if let Some(op) = self.op {
NodeKind::Operator(op)
} else if self.dep_records.is_empty() {
if self.fn_id.is_some() {
NodeKind::Producer
} else {
NodeKind::State
}
} else if self.is_dynamic {
NodeKind::Dynamic
} else {
NodeKind::Derived
}
}
pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
self.dep_records.iter().map(|r| r.node)
}
pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
self.dep_ids().collect()
}
pub(crate) fn has_sentinel_deps(&self) -> bool {
let n = self.dep_records.len();
if n == 0 {
return false;
}
if n <= 64 {
let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
self.received_mask != full_mask
} else {
self.dep_records
.iter()
.any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
}
}
pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
self.dep_records.iter().position(|r| r.node == dep_id)
}
pub(crate) fn all_deps_terminal(&self) -> bool {
!self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
}
}
pub(crate) struct CoreState {
pub(crate) next_node_id: u64,
pub(crate) next_subscription_id: u64,
pub(crate) next_lock_id: u64,
pub(crate) nodes: HashMap<NodeId, NodeRecord>,
pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
pub(crate) pause_buffer_cap: Option<usize>,
pub(crate) max_batch_drain_iterations: u32,
pub(crate) currently_firing: Vec<NodeId>,
pub(crate) binding: Arc<dyn BindingBoundary>,
pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
pub(crate) next_topology_id: u64,
pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
}
static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
#[derive(Clone)]
pub struct Core {
pub(crate) state: Arc<Mutex<CoreState>>,
pub(crate) binding: Arc<dyn BindingBoundary>,
pub(crate) deferred_producer_ops: Arc<parking_lot::Mutex<Vec<DeferredProducerOp>>>,
pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
pub(crate) generation: u64,
}
#[derive(Clone)]
pub struct WeakCore {
state: Weak<Mutex<CoreState>>,
binding: Weak<dyn BindingBoundary>,
deferred_producer_ops: Weak<parking_lot::Mutex<Vec<DeferredProducerOp>>>,
registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
generation: u64,
}
impl WeakCore {
#[must_use]
pub fn upgrade(&self) -> Option<Core> {
Some(Core {
state: self.state.upgrade()?,
binding: self.binding.upgrade()?,
deferred_producer_ops: self.deferred_producer_ops.upgrade()?,
registry: self.registry.upgrade()?,
generation: self.generation,
})
}
}
struct ScratchReleaseGuard<'a> {
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
}
impl<'a> ScratchReleaseGuard<'a> {
fn new(
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
) -> Self {
Self { scratch, binding }
}
fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
self.scratch.take()
}
}
impl Drop for ScratchReleaseGuard<'_> {
fn drop(&mut self) {
if let Some(mut scratch) = self.scratch.take() {
scratch.release_handles(self.binding);
}
}
}
impl Core {
#[must_use]
pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
Self {
state: Arc::new(Mutex::new(CoreState {
next_node_id: 1,
next_subscription_id: 1,
next_lock_id: 1u64 << 31,
nodes: HashMap::new(),
children: HashMap::new(),
currently_firing: Vec::new(),
pause_buffer_cap: None,
max_batch_drain_iterations: 10_000,
binding: binding.clone(),
topology_sinks: HashMap::new(),
next_topology_id: 1,
pending_scratch_release: Vec::new(),
})),
binding,
deferred_producer_ops: Arc::new(parking_lot::Mutex::new(Vec::new())),
registry: Arc::new(parking_lot::Mutex::new(
crate::subgraph::SubgraphRegistry::new(),
)),
generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
}
}
pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
self.state.lock()
}
#[must_use]
pub fn same_dispatcher(&self, other: &Core) -> bool {
Arc::ptr_eq(&self.state, &other.state)
}
#[must_use]
pub fn weak_handle(&self) -> WeakCore {
WeakCore {
state: Arc::downgrade(&self.state),
binding: Arc::downgrade(&self.binding),
deferred_producer_ops: Arc::downgrade(&self.deferred_producer_ops),
registry: Arc::downgrade(&self.registry),
generation: self.generation,
}
}
#[must_use]
pub fn partition_count(&self) -> usize {
self.registry.lock().component_count()
}
#[must_use]
pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
self.registry.lock().partition_of(node)
}
pub fn push_deferred_producer_op(&self, op: DeferredProducerOp) {
self.deferred_producer_ops.lock().push(op);
}
const MAX_DEFERRED_DRAIN_ITERATIONS: u32 = 1000;
pub(crate) fn drain_deferred_producer_ops(&self) {
if held_partitions::any_held() {
return;
}
let mut iterations = 0u32;
loop {
let deferred_ops: Vec<DeferredProducerOp> = {
let mut ops = self.deferred_producer_ops.lock();
if ops.is_empty() {
break;
}
std::mem::take(&mut *ops)
};
iterations += 1;
assert!(
iterations <= Self::MAX_DEFERRED_DRAIN_ITERATIONS,
"drain_deferred_producer_ops exceeded {} iterations — \
a deferred callback is likely pushing unbounded ops. \
Iteration {iterations}, batch size {}.",
Self::MAX_DEFERRED_DRAIN_ITERATIONS,
deferred_ops.len(),
);
for op in deferred_ops {
match op {
DeferredProducerOp::Emit { node_id, handle } => {
self.emit(node_id, handle);
self.binding.release_handle(handle);
}
DeferredProducerOp::Complete { node_id } => {
self.complete(node_id);
}
DeferredProducerOp::Error { node_id, handle } => {
self.error(node_id, handle);
self.binding.release_handle(handle);
}
DeferredProducerOp::Callback(f) => {
f();
}
}
}
}
}
pub(crate) fn partition_wave_owner_lock_arc(
&self,
seed: NodeId,
) -> Result<WaveOwnerGuard, PartitionOrderViolation> {
struct AcquireGuard {
sid: crate::subgraph::SubgraphId,
consumed: bool,
}
impl AcquireGuard {
fn into_consumed(mut self) {
self.consumed = true;
}
}
impl Drop for AcquireGuard {
fn drop(&mut self) {
if !self.consumed {
held_partitions::release(self.sid);
}
}
}
for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
let (sid, lock_box) = {
let mut reg = self.registry.lock();
reg.lock_for(seed).expect(
"partition_wave_owner_lock_arc: seed must be registered \
(P12-fix invariant: registry membership is published \
atomically with `s.nodes`)",
)
};
held_partitions::check_and_acquire(sid)?;
let acquire_guard = AcquireGuard {
sid,
consumed: false,
};
let inner = lock_box.wave_owner.lock_arc();
let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
if still_valid {
acquire_guard.into_consumed();
drop(lock_box);
return Ok(WaveOwnerGuard { sid, inner });
}
drop(inner);
drop(acquire_guard);
std::thread::yield_now();
}
panic!(
"partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
— pathological concurrent union/split activity. Mirrors py \
`_MAX_LOCK_RETRIES`.",
crate::subgraph::MAX_LOCK_RETRIES,
seed
);
}
pub(crate) fn compute_touched_partitions(
&self,
seed: NodeId,
) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
let s = self.lock_state();
let mut reg = self.registry.lock();
let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
let mut visited: HashSet<NodeId> = HashSet::default();
let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
stack.push(seed);
while let Some(n) = stack.pop() {
if !visited.insert(n) {
continue;
}
if let Some(p) = reg.partition_of(n) {
if !partitions.contains(&p) {
partitions.push(p);
}
}
if let Some(children) = s.children.get(&n) {
stack.extend(children.iter().copied());
}
if let Some(rec) = s.nodes.get(&n) {
stack.extend(rec.meta_companions.iter().copied());
}
}
partitions.sort_unstable_by_key(|sid| sid.raw());
partitions
}
pub(crate) fn all_partitions_lock_boxes(
&self,
) -> Vec<(
crate::subgraph::SubgraphId,
Arc<crate::subgraph::SubgraphLockBox>,
)> {
self.registry.lock().all_partitions()
}
}
pub(crate) fn walk_undirected_dep_graph(
s: &CoreState,
start: NodeId,
skip_edge: Option<(NodeId, NodeId)>,
extra_edges: &[(NodeId, NodeId)],
) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::default();
let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
queue.push(start);
while let Some(cur) = queue.pop() {
if !visited.insert(cur) {
continue;
}
if let Some(consumers) = s.children.get(&cur) {
for &c in consumers {
let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
if !is_skipped && !visited.contains(&c) {
queue.push(c);
}
}
}
if let Some(rec) = s.nodes.get(&cur) {
for d in rec.dep_records.iter().map(|r| r.node) {
let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
if !is_skipped && !visited.contains(&d) {
queue.push(d);
}
}
}
for &(ep, ec) in extra_edges {
if cur == ep && !visited.contains(&ec) {
queue.push(ec);
}
if cur == ec && !visited.contains(&ep) {
queue.push(ep);
}
}
}
visited
}
impl Core {
#[cfg(any(test, debug_assertions))]
#[must_use]
pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
crate::batch::with_wave_state(|ws| {
ws.pending_notify
.get(&node)
.map(|entry| entry.batches.len())
})
}
pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
self.lock_state().pause_buffer_cap = cap;
}
pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
let cap = match cap {
Some(0) => None,
other => other,
};
let to_release: Vec<HandleId> = {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
rec.replay_buffer_cap = cap;
match cap {
None => rec.replay_buffer.drain(..).collect(),
Some(c) => {
let mut drained = Vec::new();
while rec.replay_buffer.len() > c {
if let Some(h) = rec.replay_buffer.pop_front() {
drained.push(h);
}
}
drained
}
}
};
for h in to_release {
self.binding.release_handle(h);
}
}
pub fn set_pausable_mode(
&self,
node_id: NodeId,
mode: PausableMode,
) -> Result<(), SetPausableModeError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(SetPausableModeError::UnknownNode(node_id))?;
if rec.pause_state.is_paused() {
return Err(SetPausableModeError::WhilePaused);
}
rec.pausable = mode;
Ok(())
}
pub fn set_max_batch_drain_iterations(&self, cap: u32) {
assert!(cap > 0, "max_batch_drain_iterations must be > 0");
self.lock_state().max_batch_drain_iterations = cap;
}
pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
let dep_ids: Vec<NodeId> = {
let s = self.lock_state();
let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
rec.dep_ids_vec()
};
let tier = message.tier();
if tier == 3 || tier == 5 {
return Err(UpError::TierForbidden { tier });
}
for dep_id in dep_ids {
match message {
Message::Pause(lock) => {
let _ = self.pause(dep_id, lock);
}
Message::Resume(lock) => {
let _ = self.resume(dep_id, lock);
}
Message::Invalidate => {
self.invalidate(dep_id);
}
Message::Teardown => {
self.teardown(dep_id);
}
_ => {}
}
}
Ok(())
}
#[must_use]
pub fn alloc_lock_id(&self) -> LockId {
let mut s = self.lock_state();
let id = LockId::new(s.next_lock_id);
s.next_lock_id += 1;
id
}
#[must_use]
pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
&self.binding
}
#[allow(clippy::too_many_lines)]
pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
let NodeRegistration {
deps,
fn_or_op,
opts,
} = reg;
let NodeOpts {
initial,
equals,
partial,
is_dynamic,
pausable,
replay_buffer,
} = opts;
let (fn_id, op) = match fn_or_op {
Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
None => (None, None),
};
let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
if op.is_some() && deps.is_empty() {
return Err(RegisterError::OperatorWithoutDeps);
}
if initial != NO_HANDLE && !is_state_shape {
return Err(RegisterError::InitialOnlyForStateNodes);
}
let scratch = match op {
Some(operator_op) => self.make_op_scratch(operator_op)?,
None => None,
};
let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
let mut s = self.lock_state();
for &dep in &deps {
if !s.nodes.contains_key(&dep) {
return Err(RegisterError::UnknownDep(dep));
}
}
for &dep in &deps {
let dep_rec = s.require_node(dep);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(RegisterError::TerminalDep(dep));
}
}
let installed_scratch = scratch_guard.take();
let id = s.alloc_node_id();
let tracked: HashSet<usize> = if op.is_some() {
(0..deps.len()).collect()
} else if is_dynamic {
HashSet::new()
} else if fn_id.is_some() && !deps.is_empty() {
(0..deps.len()).collect()
} else {
HashSet::new()
};
let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
let topo_rank = if deps.is_empty() {
0
} else {
deps.iter()
.filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
.max()
.unwrap_or(0)
.saturating_add(1)
};
let rec = NodeRecord {
dep_records,
fn_id,
op,
is_dynamic,
equals,
cache: initial,
has_fired_once: initial != NO_HANDLE,
subscribers: HashMap::new(),
subscribers_revision: 0,
tracked,
dirty: false,
involved_this_wave: false,
pause_state: PauseState::Active,
pausable,
replay_buffer_cap: replay_buffer,
replay_buffer: VecDeque::new(),
terminal: None,
has_received_teardown: false,
resubscribable: false,
meta_companions: Vec::new(),
partial,
topo_rank,
received_mask: 0,
involved_mask: 0,
op_scratch: installed_scratch,
};
s.nodes.insert(id, rec);
s.children.insert(id, HashSet::new());
for &dep in &deps {
s.children.entry(dep).or_default().insert(id);
}
{
let mut reg = self.registry.lock();
reg.ensure_registered(id);
for &dep in &deps {
reg.union_nodes(id, dep);
}
}
drop(s);
self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
Ok(id)
}
pub fn register_state(
&self,
initial: HandleId,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: None,
opts: NodeOpts {
initial,
partial,
..NodeOpts::default()
},
})
}
pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
partial: true,
..NodeOpts::default()
},
})
}
pub fn register_derived(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
..NodeOpts::default()
},
})
}
pub fn register_dynamic(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
is_dynamic: true,
..NodeOpts::default()
},
})
}
fn make_op_scratch(
&self,
op: OperatorOp,
) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
Self::make_op_scratch_with_binding(&*self.binding, op)
}
pub(crate) fn make_op_scratch_with_binding(
binding: &dyn BindingBoundary,
op: OperatorOp,
) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
use crate::op_state::{
DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
TakeWhileState,
};
match op {
OperatorOp::Scan { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ScanState { acc: seed });
binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::Reduce { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ReduceState { acc: seed });
binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
OperatorOp::Last { default } => {
let state = Box::new(LastState {
latest: NO_HANDLE,
default,
});
if default != NO_HANDLE {
binding.retain_handle(default);
}
Ok(Some(state))
}
OperatorOp::TapFirst { .. } => {
Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
}
OperatorOp::Settle { .. } => {
Ok(Some(Box::new(crate::op_state::SettleState::default())))
}
OperatorOp::Map { .. }
| OperatorOp::Filter { .. }
| OperatorOp::Combine { .. }
| OperatorOp::WithLatestFrom { .. }
| OperatorOp::Merge
| OperatorOp::Tap { .. }
| OperatorOp::Valve => Ok(None),
}
}
pub fn register_operator(
&self,
deps: &[NodeId],
op: OperatorOp,
opts: OperatorOpts,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Op(op)),
opts: NodeOpts {
equals: opts.equals,
partial: opts.partial,
..NodeOpts::default()
},
})
}
#[allow(clippy::needless_pass_by_value)] pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
match self.try_subscribe(node_id, sink) {
Ok(sub) => sub,
Err(e) => panic!("{e}"),
}
}
#[allow(clippy::needless_pass_by_value)]
pub fn try_subscribe(
&self,
node_id: NodeId,
sink: Sink,
) -> Result<Subscription, SubscribeError> {
let wave_guard = self.partition_wave_owner_lock_arc(node_id)?;
let (sub_id, tier_slices, needs_activation, did_reset) = {
let mut s = self.lock_state();
let should_reject = {
let rec = s.require_node(node_id);
!rec.resubscribable && rec.terminal.is_some()
};
if should_reject {
drop(s);
drop(wave_guard);
return Err(SubscribeError::TornDown { node: node_id });
}
let sub_id = s.alloc_sub_id();
let needs_reset = {
let rec = s.require_node(node_id);
rec.resubscribable && rec.terminal.is_some()
};
if needs_reset {
self.reset_for_fresh_lifecycle(&mut s, node_id);
}
let (cache, is_state, first_subscriber) = {
let rec = s.require_node(node_id);
debug_assert!(
rec.terminal.is_none(),
"R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
);
debug_assert!(
!rec.has_received_teardown,
"R2.2.7.a invariant: reset clears has_received_teardown"
);
(rec.cache, rec.is_state(), rec.subscribers.is_empty())
};
let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
tier_slices.push(vec![Message::Start]);
if cache != NO_HANDLE {
tier_slices.push(vec![Message::Data(cache)]);
}
let replay_handles: Vec<HandleId> = {
let rec = s.require_node(node_id);
let cap = rec.replay_buffer_cap.unwrap_or(0);
if cap == 0 {
Vec::new()
} else {
let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
if cache != NO_HANDLE && v.last() == Some(&cache) {
v.pop();
}
v
}
};
for h in &replay_handles {
tier_slices.push(vec![Message::Data(*h)]);
}
{
let rec = s.require_node_mut(node_id);
rec.subscribers.insert(sub_id, sink.clone());
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
let needs_activation = first_subscriber && !is_state;
(sub_id, tier_slices, needs_activation, needs_reset)
};
if did_reset {
self.binding.wipe_ctx(node_id);
}
for slice in &tier_slices {
let sink_clone = sink.clone();
let slice_ref: &[Message] = slice;
let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
if let Err(panic_payload) = result {
{
let mut s = self.lock_state();
if let Some(rec) = s.nodes.get_mut(&node_id) {
rec.subscribers.remove(&sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
}
std::panic::resume_unwind(panic_payload);
}
}
if needs_activation {
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.activate_derived(&mut s, node_id);
});
}
drop(wave_guard);
self.drain_deferred_producer_ops();
Ok(Subscription {
state: Arc::downgrade(&self.state),
node_id,
sub_id,
})
}
pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
assert!(
rec.subscribers.is_empty(),
"set_resubscribable: node already has subscribers; \
configure resubscribable before any subscribe call"
);
rec.resubscribable = resubscribable;
}
fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
let rec = s.require_node_mut(node_id);
let mut hs = Vec::new();
if let Some(TerminalKind::Error(h)) = rec.terminal {
hs.push(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
hs.push(h);
}
for &h in &dr.data_batch {
hs.push(h);
}
if dr.prev_data != NO_HANDLE {
hs.push(dr.prev_data);
}
}
let mut pulled = Vec::new();
if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
for msg in buffer.drain(..) {
if let Some(h) = msg.payload_handle() {
pulled.push(h);
}
}
}
for h in rec.replay_buffer.drain(..) {
pulled.push(h);
}
rec.terminal = None;
rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
rec.has_received_teardown = false;
for dr in &mut rec.dep_records {
dr.prev_data = NO_HANDLE;
dr.data_batch.clear();
dr.terminal = None;
dr.dirty = false;
dr.involved_this_wave = false;
}
rec.pause_state = PauseState::Active;
rec.involved_this_wave = false;
rec.dirty = false;
rec.received_mask = 0;
rec.involved_mask = 0;
if rec.is_dynamic {
rec.tracked.clear();
}
let prev_op = rec.op;
let old = std::mem::take(&mut rec.op_scratch);
(prev_op, old, hs, pulled)
};
let new_scratch = match prev_op {
Some(op) => self
.make_op_scratch(op)
.expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
None => None,
};
if let Some(scratch) = old_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
drop(old_scratch);
let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
std::mem::take(&mut s.pending_scratch_release);
for mut scratch in queued {
scratch.release_handles(&*self.binding);
}
{
let rec = s.require_node_mut(node_id);
rec.op_scratch = new_scratch;
}
for h in handles_to_release {
self.binding.release_handle(h);
}
for h in pause_buffer_payloads {
self.binding.release_handle(h);
}
}
pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut order: Vec<NodeId> = Vec::new();
let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
while let Some((id, finalize)) = stack.pop() {
if finalize {
order.push(id);
continue;
}
if !visited.insert(id) {
continue;
}
stack.push((id, true));
let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
for dep_id in dep_ids {
let (dep_is_state, dep_cache, dep_has_fired) = {
let dep_rec = s.require_node(dep_id);
(dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
};
if !dep_is_state
&& dep_cache == NO_HANDLE
&& !dep_has_fired
&& !visited.contains(&dep_id)
{
stack.push((dep_id, false));
}
}
}
for &id in &order {
let (dep_ids, is_producer) = {
let rec = s.require_node(id);
(rec.dep_ids_vec(), rec.is_producer())
};
if is_producer {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(id);
});
continue;
}
for (i, dep_id) in dep_ids.iter().copied().enumerate() {
let dep_cache = s.require_node(dep_id).cache;
if dep_cache != NO_HANDLE {
self.deliver_data_to_consumer(s, id, i, dep_cache);
}
}
}
}
pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
match self.try_emit(node_id, new_handle) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_emit(
&self,
node_id: NodeId,
new_handle: HandleId,
) -> Result<(), PartitionOrderViolation> {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4)"
);
{
let s = self.lock_state();
let rec = s.require_node(node_id);
assert!(
rec.is_state() || rec.is_producer(),
"emit() is for state or producer nodes only; \
derived/dynamic/operator emit via their fn return value"
);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return Ok(());
}
}
self.try_run_wave_for(node_id, |this| {
this.commit_emission(node_id, new_handle);
})?;
Ok(())
}
pub fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
if self.try_emit(node_id, new_handle).is_err() {
self.binding.retain_handle(new_handle);
self.push_deferred_producer_op(DeferredProducerOp::Emit {
node_id,
handle: new_handle,
});
}
}
#[must_use]
pub fn cache_of(&self, node_id: NodeId) -> HandleId {
self.lock_state().require_node(node_id).cache
}
#[must_use]
pub fn has_fired_once(&self, node_id: NodeId) -> bool {
self.lock_state().require_node(node_id).has_fired_once
}
#[must_use]
pub fn node_ids(&self) -> Vec<NodeId> {
self.lock_state().nodes.keys().copied().collect()
}
#[must_use]
pub fn node_count(&self) -> usize {
self.lock_state().nodes.len()
}
#[must_use]
pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
}
#[must_use]
pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&node_id)
.map(NodeRecord::dep_ids_vec)
.unwrap_or_default()
}
#[must_use]
pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
self.lock_state()
.nodes
.get(&node_id)
.and_then(|r| r.terminal)
}
#[must_use]
pub fn is_dirty(&self, node_id: NodeId) -> bool {
self.lock_state()
.nodes
.get(&node_id)
.is_some_and(|r| r.dirty)
}
#[must_use]
pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&parent)
.map(|r| r.meta_companions.clone())
.unwrap_or_default()
}
}
impl Core {
pub fn complete(&self, node_id: NodeId) {
match self.try_complete(node_id) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_complete(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
self.try_emit_terminal(node_id, TerminalKind::Complete)
}
pub fn complete_or_defer(&self, node_id: NodeId) {
match self.try_complete(node_id) {
Ok(()) => {}
Err(_) => {
self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
}
}
}
pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
match self.try_error(node_id, error_handle) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_error(
&self,
node_id: NodeId,
error_handle: HandleId,
) -> Result<(), PartitionOrderViolation> {
assert!(
error_handle != NO_HANDLE,
"NO_HANDLE is not a valid ERROR payload (R1.2.5)"
);
self.try_emit_terminal(node_id, TerminalKind::Error(error_handle))?;
self.binding.release_handle(error_handle);
Ok(())
}
pub fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
if self.try_error(node_id, error_handle).is_err() {
self.binding.retain_handle(error_handle);
self.push_deferred_producer_op(DeferredProducerOp::Error {
node_id,
handle: error_handle,
});
}
}
fn try_emit_terminal(
&self,
node_id: NodeId,
terminal: TerminalKind,
) -> Result<(), PartitionOrderViolation> {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.try_run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.terminate_node(&mut s, node_id, terminal);
})
}
fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
while let Some((id, t)) = work.pop() {
if s.require_node(id).terminal.is_some() {
continue; }
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let queue_wipe = {
let rec = s.require_node(id);
rec.resubscribable && rec.subscribers.is_empty()
};
s.require_node_mut(id).terminal = Some(t);
crate::batch::with_wave_state(|ws| {
if queue_wipe {
ws.pending_wipes.push(id);
}
ws.pending_fires.remove(&id);
});
let drained: Vec<HandleId> = {
let rec = s.require_node_mut(id);
let mut drained: Vec<HandleId> = Vec::new();
if rec.pause_state.is_paused() {
let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
if let PauseState::Paused { buffer, .. } = prev {
drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
}
}
drained.extend(rec.replay_buffer.drain(..));
drained
};
for h in drained {
self.binding.release_handle(h);
}
let msg = match t {
TerminalKind::Complete => Message::Complete,
TerminalKind::Error(h) => Message::Error(h),
};
self.queue_notify(s, id, msg);
let child_ids: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(id);
let Some(idx) = dep_idx else { continue };
{
let child = s.require_node_mut(child_id);
if child.dep_records[idx].terminal.is_some() {
continue;
}
child.dep_records[idx].terminal = Some(t);
}
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let action = {
let child = s.require_node(child_id);
if child.terminal.is_some() {
ChildAction::None } else if child.all_deps_terminal() {
if child.skips_auto_cascade() {
ChildAction::QueueFire
} else {
ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
}
} else {
ChildAction::None
}
};
match action {
ChildAction::None => {}
ChildAction::Cascade(t_child) => {
work.push((child_id, t_child));
}
ChildAction::QueueFire => {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(child_id);
});
}
}
}
}
}
}
enum ChildAction {
None,
Cascade(TerminalKind),
QueueFire,
}
fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
for dr in dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
return TerminalKind::Error(h);
}
}
TerminalKind::Complete
}
impl Core {
pub fn teardown(&self, node_id: NodeId) {
match self.try_teardown(node_id) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub fn teardown_or_defer(&self, node_id: NodeId) {
match self.try_teardown(node_id) {
Ok(()) => {}
Err(_) => {
self.push_deferred_producer_op(DeferredProducerOp::Callback(Box::new({
let core = self.clone();
move || {
core.teardown(node_id);
}
})));
}
}
}
fn try_teardown(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
let torn_down_for_wave = torn_down.clone();
self.try_run_wave_for(node_id, move |this| {
let mut s = this.lock_state();
let collected = this.teardown_inner(&mut s, node_id);
torn_down_for_wave.lock().extend(collected);
})?;
let ids = std::mem::take(&mut *torn_down.lock());
for id in ids {
self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
}
Ok(())
}
fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
enum Action {
Visit(NodeId),
EmitTeardown(NodeId),
}
let mut stack: Vec<Action> = vec![Action::Visit(root)];
let mut torn_down: Vec<NodeId> = Vec::new();
while let Some(action) = stack.pop() {
match action {
Action::Visit(id) => {
if s.require_node(id).has_received_teardown {
continue; }
s.require_node_mut(id).has_received_teardown = true;
let children: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for &child in children.iter().rev() {
stack.push(Action::Visit(child));
}
stack.push(Action::EmitTeardown(id));
let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
for &meta in metas.iter().rev() {
stack.push(Action::Visit(meta));
}
}
Action::EmitTeardown(id) => {
let already_terminal = s.require_node(id).terminal.is_some();
if !already_terminal {
self.terminate_node(s, id, TerminalKind::Complete);
}
self.queue_notify(s, id, Message::Teardown);
torn_down.push(id);
}
}
}
torn_down
}
pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
assert!(parent != companion, "node cannot be its own meta companion");
let mut s = self.lock_state();
assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
assert!(
s.nodes.contains_key(&companion),
"unknown companion {companion:?}"
);
let metas = &mut s.require_node_mut(parent).meta_companions;
if !metas.contains(&companion) {
metas.push(companion);
}
}
}
impl Core {
pub fn invalidate(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.invalidate_inner(&mut s, node_id);
});
}
pub fn invalidate_or_defer(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
let result = self.try_run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.invalidate_inner(&mut s, node_id);
});
if result.is_err() {
self.push_deferred_producer_op(DeferredProducerOp::Callback(Box::new({
let core = self.clone();
move || {
core.invalidate(node_id);
}
})));
}
}
fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
let mut work: Vec<NodeId> = vec![root];
while let Some(node_id) = work.pop() {
let old_handle = s.require_node(node_id).cache;
if old_handle == NO_HANDLE {
continue;
}
s.require_node_mut(node_id).cache = NO_HANDLE;
self.binding.release_handle(old_handle);
crate::batch::with_wave_state(|ws| {
if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
ws.deferred_cleanup_hooks
.push((node_id, CleanupTrigger::OnInvalidate));
}
});
self.queue_notify(s, node_id, Message::Invalidate);
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
let dr = &s.require_node(child_id).dep_records[idx];
(dr.prev_data, dr.data_batch.clone())
};
{
crate::batch::with_wave_state(|ws| {
if old_prev != NO_HANDLE {
ws.deferred_handle_releases.push(old_prev);
}
for h in batch_hs {
ws.deferred_handle_releases.push(h);
}
});
}
let child_rec = s.require_node_mut(child_id);
child_rec.dep_records[idx].prev_data = NO_HANDLE;
child_rec.dep_records[idx].data_batch.clear();
if idx < 64 {
child_rec.received_mask &= !(1u64 << idx);
}
work.push(child_id);
}
}
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ResumeReport {
pub replayed: u32,
pub dropped: u32,
}
impl Core {
pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.terminal.is_some() {
return Ok(());
}
if rec.pausable == PausableMode::Off {
return Ok(());
}
rec.pause_state.add_lock(lock_id);
Ok(())
}
pub fn resume(
&self,
node_id: NodeId,
lock_id: LockId,
) -> Result<Option<ResumeReport>, PauseError> {
let (sinks, messages, dropped, pending_wave_for_default) = {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.pausable == PausableMode::Off {
return Ok(None);
}
let was_default_mode = rec.pausable == PausableMode::Default;
let pending_wave = if was_default_mode {
rec.pause_state.take_pending_wave()
} else {
false
};
let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
if pending_wave {
rec.pause_state.mark_pending_wave();
}
return Ok(None);
};
let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
let messages: Vec<Message> = buffer.into_iter().collect();
if pending_wave && was_default_mode {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(node_id);
});
}
(sinks, messages, dropped, pending_wave && was_default_mode)
};
let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
if !messages.is_empty() {
for sink in &sinks {
sink(&messages);
}
for msg in &messages {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
if pending_wave_for_default {
self.run_wave_for(node_id, |_this| {
});
}
Ok(Some(ResumeReport { replayed, dropped }))
}
#[must_use]
pub fn is_paused(&self, node_id: NodeId) -> bool {
self.state
.lock()
.require_node(node_id)
.pause_state
.is_paused()
}
#[must_use]
pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
self.state
.lock()
.require_node(node_id)
.pause_state
.lock_count()
}
#[must_use]
pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
self.state
.lock()
.require_node(node_id)
.pause_state
.contains_lock(lock_id)
}
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum SetDepsError {
#[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
SelfDependency { n: NodeId },
#[error(
"set_deps({n:?}, ...): cycle would form via path {path:?} \
(adding {added_dep:?} → {n:?} closes the loop)"
)]
WouldCreateCycle {
n: NodeId,
added_dep: NodeId,
path: Vec<NodeId>,
},
#[error("set_deps: unknown node {0:?}")]
UnknownNode(NodeId),
#[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
NotComputeNode(NodeId),
#[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
TerminalNode { n: NodeId },
#[error(
"set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
either mark it resubscribable before terminate, or remove the dep from new_deps"
)]
TerminalDep { n: NodeId, dep: NodeId },
#[error(
"set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
(set_deps from inside the firing node's own fn would corrupt the \
Dynamic `tracked` indices snapshot taken before invoke_fn). \
Schedule the rewire outside this fire scope."
)]
ReentrantOnFiringNode { n: NodeId },
#[error(
"set_deps({n:?}, ...): rejected — would migrate the partition of \
currently-firing node {firing:?} mid-wave (union/split during \
fire would invalidate the held wave_owner Arc). Schedule the \
rewire outside the wave."
)]
PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
}
impl Core {
#[allow(clippy::too_many_lines)]
pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
let mut s = self.lock_state();
let (is_state, is_producer, is_terminal) = {
let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
(rec.is_state(), rec.is_producer(), rec.terminal.is_some())
};
if is_state || is_producer {
return Err(SetDepsError::NotComputeNode(n));
}
if is_terminal {
return Err(SetDepsError::TerminalNode { n });
}
if s.currently_firing.contains(&n) {
return Err(SetDepsError::ReentrantOnFiringNode { n });
}
if new_deps.contains(&n) {
return Err(SetDepsError::SelfDependency { n });
}
for &d in new_deps {
if !s.nodes.contains_key(&d) {
return Err(SetDepsError::UnknownNode(d));
}
}
let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
for &d in &added {
if let Some(path) = self.path_from_to(&s, n, d) {
return Err(SetDepsError::WouldCreateCycle {
n,
added_dep: d,
path,
});
}
}
for &d in &added {
let dep_rec = s.require_node(d);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(SetDepsError::TerminalDep { n, dep: d });
}
}
let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
let currently_firing_snapshot: Vec<NodeId> = s.currently_firing.clone();
if !currently_firing_snapshot.is_empty() && (!added.is_empty() || !removed.is_empty()) {
let mut reg = self.registry.lock();
let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> =
currently_firing_snapshot
.iter()
.filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
.collect();
if !firing_with_partition.is_empty() {
let part_n = reg.partition_of(n);
for &added_dep in &added {
let part_added = reg.partition_of(added_dep);
if part_n == part_added {
continue; }
let affected = [part_n, part_added];
if let Some(&(firing, _)) = firing_with_partition
.iter()
.find(|(_, p)| affected.contains(&Some(*p)))
{
return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
}
}
let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
for &removed_dep in &removed {
let part_removed = reg.partition_of(removed_dep);
let visited = walk_undirected_dep_graph(
&s,
removed_dep,
Some((removed_dep, n)),
&added_edges,
);
let would_disconnect = !visited.contains(&n);
if would_disconnect {
if let Some(&(firing, _)) = firing_with_partition
.iter()
.find(|(_, p)| Some(*p) == part_removed)
{
return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
}
}
}
}
}
if added.is_empty() && removed.is_empty() {
return Ok(());
}
let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
let rec = s.require_node(n);
let old_by_node: HashMap<NodeId, &DepRecord> =
rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
let new_records: Vec<DepRecord> = new_deps_vec
.iter()
.map(|&d| {
if let Some(old) = old_by_node.get(&d) {
DepRecord {
node: d,
prev_data: old.prev_data,
dirty: old.dirty,
involved_this_wave: old.involved_this_wave,
data_batch: old.data_batch.clone(),
terminal: old.terminal,
}
} else {
DepRecord::new(d)
}
})
.collect();
let mut to_release: Vec<HandleId> = Vec::new();
for d in &removed {
if let Some(old) = old_by_node.get(d) {
if let Some(TerminalKind::Error(h)) = old.terminal {
to_release.push(h);
}
for &h in &old.data_batch {
to_release.push(h);
}
}
}
(new_records, to_release)
};
let new_topo_rank = if new_deps_vec.is_empty() {
0
} else {
new_deps_vec
.iter()
.filter(|&&d| d != n)
.filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
.max()
.unwrap_or(0)
.saturating_add(1)
};
let fire_set_deps_on_rerun;
{
let rec = s.require_node_mut(n);
fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
rec.dep_records = new_dep_records;
rec.topo_rank = new_topo_rank;
rec.received_mask = 0;
rec.involved_mask = 0;
for (i, dr) in rec.dep_records.iter().enumerate() {
if i < 64 {
if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
rec.received_mask |= 1u64 << i;
}
if dr.involved_this_wave {
rec.involved_mask |= 1u64 << i;
}
}
}
if rec.is_dynamic {
rec.tracked.clear();
rec.has_fired_once = false;
} else {
rec.tracked = (0..new_deps_vec.len()).collect();
}
}
for &removed_dep in &removed {
if let Some(set) = s.children.get_mut(&removed_dep) {
set.remove(&n);
}
}
for &added_dep in &added {
s.children.entry(added_dep).or_default().insert(n);
}
let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
{
let mut reg = self.registry.lock();
for &added_dep in &added {
reg.union_nodes(n, added_dep);
}
for &removed_dep in &removed {
let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
if visited.contains(&n) {
continue;
}
let original_root = reg.find(removed_dep);
let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
let component_nodes: Vec<NodeId> = snapshot_keys
.into_iter()
.filter(|&node| reg.find(node) == original_root)
.collect();
let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
for &node in &component_nodes {
if let Some(rec) = s.nodes.get(&node) {
for d in rec.dep_records.iter().map(|r| r.node) {
if component_set.contains(&d) {
edges_in_component.push((d, node));
}
}
}
}
let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
reg.on_edge_removed(n, removed_dep);
}
}
if new_deps_vec.is_empty() {
crate::batch::with_wave_state(|ws| {
let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
let mut unpaired: i32 = 0;
for m in entry.iter_messages() {
match m {
crate::message::Message::Dirty => unpaired += 1,
crate::message::Message::Data(_)
| crate::message::Message::Resolved
| crate::message::Message::Complete
| crate::message::Message::Error(_)
if unpaired > 0 =>
{
unpaired -= 1;
}
_ => {}
}
}
unpaired > 0
});
if needs_auto_resolve {
ws.pending_auto_resolve.insert(n);
}
});
}
drop(s);
if fire_set_deps_on_rerun {
self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
}
self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
node: n,
old_deps: old_deps_vec,
new_deps: new_deps_vec.clone(),
});
if !added_for_wave.is_empty() {
self.run_wave_for(n, |this| {
let mut s = this.lock_state();
if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
return;
}
for added_dep in &added_for_wave {
let cache = match s.nodes.get(added_dep) {
Some(rec) => rec.cache,
None => continue, };
if cache == NO_HANDLE {
continue;
}
let dep_idx = s.require_node(n).dep_index_of(*added_dep);
if let Some(idx) = dep_idx {
this.deliver_data_to_consumer(&mut s, n, idx, cache);
}
}
});
}
for h in removed_handles {
self.binding.release_handle(h);
}
Ok(())
}
fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
if from == to {
return Some(vec![from]);
}
let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
let mut visited: HashSet<NodeId> = HashSet::new();
while let Some((cur, path)) = stack.pop() {
if !visited.insert(cur) {
continue;
}
if cur == to {
return Some(path);
}
if let Some(children) = s.children.get(&cur) {
for &child in children {
let mut new_path = path.clone();
new_path.push(child);
stack.push((child, new_path));
}
}
}
None
}
}
impl CoreState {
fn alloc_node_id(&mut self) -> NodeId {
let id = NodeId::new(self.next_node_id);
self.next_node_id += 1;
id
}
fn alloc_sub_id(&mut self) -> SubscriptionId {
let id = SubscriptionId(self.next_subscription_id);
self.next_subscription_id += 1;
id
}
pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
self.currently_firing.clear();
for rec in self.nodes.values_mut() {
rec.dirty = false;
rec.involved_this_wave = false;
rec.involved_mask = 0;
for dr in &mut rec.dep_records {
let batch_len = dr.data_batch.len();
if batch_len > 0 {
for &h in &dr.data_batch[..batch_len - 1] {
ws.deferred_handle_releases.push(h);
}
if dr.prev_data != NO_HANDLE {
ws.deferred_handle_releases.push(dr.prev_data);
}
dr.prev_data = dr.data_batch[batch_len - 1];
dr.data_batch.clear();
}
dr.dirty = false;
dr.involved_this_wave = false;
}
}
}
pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
self.nodes
.get(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
self.nodes
.get_mut(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
}
impl Drop for CoreState {
fn drop(&mut self) {
for rec in self.nodes.values_mut() {
if rec.cache != NO_HANDLE {
self.binding.release_handle(rec.cache);
}
if let Some(TerminalKind::Error(h)) = rec.terminal {
self.binding.release_handle(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
self.binding.release_handle(h);
}
for &h in &dr.data_batch {
self.binding.release_handle(h);
}
if dr.prev_data != NO_HANDLE {
self.binding.release_handle(dr.prev_data);
}
}
if let PauseState::Paused { buffer, .. } = &rec.pause_state {
for msg in buffer {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
for &h in &rec.replay_buffer {
self.binding.release_handle(h);
}
if let Some(scratch) = rec.op_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
}
let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
std::mem::take(&mut self.pending_scratch_release);
for mut scratch in queued {
scratch.release_handles(&*self.binding);
}
}
}