use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use parking_lot::Mutex;
use smallvec::SmallVec;
use thiserror::Error;
use crate::boundary::{BindingBoundary, CleanupTrigger};
use crate::clock::monotonic_ns;
use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
use crate::message::Message;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TerminalKind {
Complete,
Error(HandleId),
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum NodeKind {
State,
Producer,
Derived,
Dynamic,
Operator(OperatorOp),
}
impl NodeKind {
pub(crate) fn skips_auto_cascade(self) -> bool {
matches!(
self,
NodeKind::Operator(
OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
)
)
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum OperatorOp {
Map { fn_id: FnId },
Filter { fn_id: FnId },
Scan { fn_id: FnId, seed: HandleId },
Reduce { fn_id: FnId, seed: HandleId },
DistinctUntilChanged { equals_fn_id: FnId },
Pairwise { fn_id: FnId },
Combine { pack_fn: FnId },
WithLatestFrom { pack_fn: FnId },
Merge,
Take { count: u32 },
Skip { count: u32 },
TakeWhile { fn_id: FnId },
Last { default: HandleId },
Tap { fn_id: FnId },
TapFirst { fn_id: FnId },
Valve,
Settle {
quiet_waves: u32,
max_waves: Option<u32>,
},
}
#[derive(Copy, Clone, Debug)]
pub struct OperatorOpts {
pub equals: EqualsMode,
pub partial: bool,
}
impl Default for OperatorOpts {
fn default() -> Self {
Self {
equals: EqualsMode::Identity,
partial: false,
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum NodeFnOrOp {
Fn(FnId),
Op(OperatorOp),
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum PausableMode {
#[default]
Default,
ResumeAll,
Off,
}
#[derive(Copy, Clone, Debug)]
pub struct NodeOpts {
pub initial: HandleId,
pub equals: EqualsMode,
pub partial: bool,
pub is_dynamic: bool,
pub pausable: PausableMode,
pub replay_buffer: Option<usize>,
pub terminal_as_real_input: bool,
}
impl Default for NodeOpts {
fn default() -> Self {
Self {
initial: NO_HANDLE,
equals: EqualsMode::Identity,
partial: false,
is_dynamic: false,
pausable: PausableMode::Default,
replay_buffer: None,
terminal_as_real_input: false,
}
}
}
#[derive(Clone, Debug)]
pub struct NodeRegistration {
pub deps: Vec<NodeId>,
pub fn_or_op: Option<NodeFnOrOp>,
pub opts: NodeOpts,
}
#[derive(Copy, Clone, Debug)]
pub enum EqualsMode {
Identity,
Custom(FnId),
}
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct SubscriptionId(u64);
#[allow(clippy::too_many_lines)] pub(crate) fn unsubscribe_sink(core: &Core, node_id: NodeId, sub_id: SubscriptionId) {
let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
let mut s = St::new(core);
let Some(rec) = s.nodes.get_mut(&node_id) else {
return;
};
rec.subscribers.remove(&sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
let last = rec.subscribers.is_empty();
let producer = rec.is_producer();
let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
let binding = if last { Some(s.binding.clone()) } else { None };
(last, producer, user_cleanup, fire_wipe, binding)
};
if was_last_sub {
if let Some(binding) = binding {
if has_user_cleanup {
binding.cleanup_for(node_id, CleanupTrigger::OnDeactivation);
}
if is_producer {
let unsub = |up_node: NodeId, up_sub: SubscriptionId| {
unsubscribe_sink(core, up_node, up_sub);
};
binding.producer_deactivate(node_id, &unsub);
}
if fire_wipe {
binding.wipe_ctx(node_id);
}
let (to_release, scratch_to_release): (
Vec<HandleId>,
Option<Box<dyn crate::op_state::OperatorScratch>>,
) = {
let mut s = St::new(core);
if let Some(rec) = s.nodes.get_mut(&node_id) {
if !rec.subscribers.is_empty() {
return;
}
let mut handles: Vec<HandleId> = Vec::new();
for dr in &mut rec.dep_records {
if dr.prev_data != NO_HANDLE {
handles.push(dr.prev_data);
dr.prev_data = NO_HANDLE;
}
for h in dr.data_batch.drain(..) {
handles.push(h);
}
if let Some(TerminalKind::Error(h)) = dr.terminal {
handles.push(h);
}
dr.terminal = None;
dr.dirty = false;
dr.involved_this_wave = false;
}
if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
for msg in buffer.drain(..) {
if let Some(h) = msg.payload_handle() {
handles.push(h);
}
}
}
for h in rec.replay_buffer.drain(..) {
handles.push(h);
}
rec.pause_state = PauseState::Active;
let is_compute = rec.fn_id.is_some() || rec.op.is_some();
if is_compute && rec.cache != NO_HANDLE {
handles.push(rec.cache);
rec.cache = NO_HANDLE;
}
rec.has_fired_once = false;
rec.dirty = false;
rec.involved_this_wave = false;
rec.received_mask = 0;
rec.involved_mask = 0;
if rec.is_dynamic {
rec.tracked.clear();
}
let scratch = if !rec.resubscribable {
std::mem::take(&mut rec.op_scratch)
} else if let Some(op) = rec.op {
let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
.expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
if let Some(old_box) = old {
s.shared.pending_scratch_release.push(old_box);
}
None
} else {
None
};
(handles, scratch)
} else {
(Vec::new(), None)
}
};
for h in to_release {
binding.release_handle(h);
}
if let Some(mut scratch) = scratch_to_release {
scratch.release_handles(&*binding);
}
}
}
}
pub type Sink = Arc<dyn Fn(&[Message])>;
#[derive(Debug)]
pub(crate) enum PauseState {
Active,
Paused {
locks: SmallVec<[LockId; 2]>,
buffer: VecDeque<Message>,
dropped: u32,
started_at_ns: u64,
overflow_reported: bool,
pending_wave: bool,
},
}
impl PauseState {
pub(crate) fn is_paused(&self) -> bool {
matches!(self, Self::Paused { .. })
}
fn lock_count(&self) -> usize {
match self {
Self::Active => 0,
Self::Paused { locks, .. } => locks.len(),
}
}
fn contains_lock(&self, lock_id: LockId) -> bool {
match self {
Self::Active => false,
Self::Paused { locks, .. } => locks.contains(&lock_id),
}
}
fn add_lock(&mut self, lock_id: LockId) {
match self {
Self::Active => {
let mut locks = SmallVec::new();
locks.push(lock_id);
*self = Self::Paused {
locks,
buffer: VecDeque::new(),
dropped: 0,
started_at_ns: monotonic_ns(),
overflow_reported: false,
pending_wave: false,
};
}
Self::Paused { locks, .. } => {
if !locks.contains(&lock_id) {
locks.push(lock_id);
}
}
}
}
pub(crate) fn mark_pending_wave(&mut self) {
if let Self::Paused { pending_wave, .. } = self {
*pending_wave = true;
}
}
pub(crate) fn take_pending_wave(&mut self) -> bool {
if let Self::Paused { pending_wave, .. } = self {
std::mem::replace(pending_wave, false)
} else {
false
}
}
fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
match self {
Self::Active => None,
Self::Paused { locks, .. } => {
if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
locks.swap_remove(idx);
}
if locks.is_empty() {
let prev = std::mem::replace(self, Self::Active);
if let Self::Paused {
buffer, dropped, ..
} = prev
{
return Some((buffer, dropped));
}
}
None
}
}
}
pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
let mut result = PushBufferedResult::default();
if let Self::Paused {
buffer,
dropped,
overflow_reported,
..
} = self
{
buffer.push_back(msg);
if let Some(c) = cap {
while buffer.len() > c {
if let Some(dropped_msg) = buffer.pop_front() {
result.dropped_msgs.push(dropped_msg);
}
*dropped = dropped.saturating_add(1);
}
}
if !result.dropped_msgs.is_empty() && !*overflow_reported {
*overflow_reported = true;
result.first_overflow_this_cycle = true;
}
}
result
}
pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
match self {
Self::Active => None,
Self::Paused {
dropped,
started_at_ns,
..
} => {
let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
Some((*dropped, lock_held_ns))
}
}
}
}
#[derive(Default)]
pub(crate) struct PushBufferedResult {
pub(crate) dropped_msgs: Vec<Message>,
pub(crate) first_overflow_this_cycle: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct PendingPauseOverflow {
pub(crate) node_id: NodeId,
pub(crate) dropped_count: u32,
pub(crate) configured_max: usize,
pub(crate) lock_held_ns: u64,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[error("vestigial PartitionOrderViolation (never constructed post-§7)")]
pub struct PartitionOrderViolation {
pub attempted: u64,
pub max_held: u64,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum SubscribeError {
#[error(transparent)]
PartitionOrderViolation(#[from] PartitionOrderViolation),
#[error(
"subscribe({node:?}): node is non-resubscribable and has terminated; \
the stream is permanently over (R2.2.7.b)"
)]
TornDown {
node: NodeId,
},
}
pub enum DeferredProducerOp {
Emit { node_id: NodeId, handle: HandleId },
Complete { node_id: NodeId },
Error { node_id: NodeId, handle: HandleId },
Callback(Box<dyn FnOnce() + Send>),
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum PauseError {
#[error("pause/resume: unknown node {0:?}")]
UnknownNode(NodeId),
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum UpError {
#[error("up: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"up: tier {tier} is forbidden upstream — value (tier 3) and \
terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
)]
TierForbidden { tier: u8 },
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum RegisterError {
#[error("register: unknown dep {0:?}")]
UnknownDep(NodeId),
#[error(
"register: operator nodes require at least one dep — \
use register_producer for subscription-managed combinators"
)]
OperatorWithoutDeps,
#[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
InitialOnlyForStateNodes,
#[error(
"register: dep {0:?} is terminal and not resubscribable; \
mark it resubscribable before terminating, or remove it from the dep list"
)]
TerminalDep(NodeId),
#[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
OperatorSeedSentinel,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum SetPausableModeError {
#[error("set_pausable_mode: unknown node {0:?}")]
UnknownNode(NodeId),
#[error(
"set_pausable_mode: cannot change pausable mode while paused; \
resume all locks first"
)]
WhilePaused,
}
pub(crate) struct DepRecord {
pub(crate) node: NodeId,
pub(crate) prev_data: HandleId,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) data_batch: SmallVec<[HandleId; 1]>,
pub(crate) terminal: Option<TerminalKind>,
}
impl DepRecord {
fn new(node: NodeId) -> Self {
Self {
node,
prev_data: NO_HANDLE,
dirty: false,
involved_this_wave: false,
data_batch: SmallVec::new(),
terminal: None,
}
}
}
#[allow(clippy::struct_excessive_bools)]
pub(crate) struct NodeRecord {
pub(crate) dep_records: Vec<DepRecord>,
pub(crate) fn_id: Option<FnId>,
pub(crate) op: Option<OperatorOp>,
pub(crate) is_dynamic: bool,
pub(crate) equals: EqualsMode,
pub(crate) cache: HandleId,
pub(crate) has_fired_once: bool,
pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
pub(crate) subscribers_revision: u64,
pub(crate) tracked: HashSet<usize>,
pub(crate) dirty: bool,
pub(crate) involved_this_wave: bool,
pub(crate) pause_state: PauseState,
pub(crate) pausable: PausableMode,
pub(crate) replay_buffer_cap: Option<usize>,
pub(crate) replay_buffer: VecDeque<HandleId>,
pub(crate) terminal: Option<TerminalKind>,
pub(crate) has_received_teardown: bool,
pub(crate) resubscribable: bool,
pub(crate) meta_companions: Vec<NodeId>,
pub(crate) partial: bool,
pub(crate) terminal_as_real_input: bool,
pub(crate) topo_rank: u32,
pub(crate) received_mask: u64,
pub(crate) involved_mask: u64,
pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
}
impl NodeRecord {
pub(crate) fn is_state(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
}
pub(crate) fn is_producer(&self) -> bool {
self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
}
#[allow(dead_code)] pub(crate) fn is_compute(&self) -> bool {
!self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
}
#[allow(dead_code)] pub(crate) fn is_operator(&self) -> bool {
self.op.is_some()
}
pub(crate) fn skips_auto_cascade(&self) -> bool {
match self.op {
Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
None => self.terminal_as_real_input,
}
}
pub(crate) fn kind(&self) -> NodeKind {
if let Some(op) = self.op {
NodeKind::Operator(op)
} else if self.dep_records.is_empty() {
if self.fn_id.is_some() {
NodeKind::Producer
} else {
NodeKind::State
}
} else if self.is_dynamic {
NodeKind::Dynamic
} else {
NodeKind::Derived
}
}
pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
self.dep_records.iter().map(|r| r.node)
}
pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
self.dep_ids().collect()
}
pub(crate) fn has_sentinel_deps(&self) -> bool {
let n = self.dep_records.len();
if n == 0 {
return false;
}
if n <= 64 {
let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
self.received_mask != full_mask
} else {
self.dep_records
.iter()
.any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
}
}
pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
self.dep_records.iter().position(|r| r.node == dep_id)
}
pub(crate) fn all_deps_terminal(&self) -> bool {
!self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
}
}
pub struct CoreShared {
pub(crate) next_node_id: u64,
pub(crate) next_subscription_id: u64,
pub(crate) next_lock_id: u64,
pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
pub(crate) next_topology_id: u64,
pub(crate) pause_buffer_cap: Option<usize>,
pub(crate) max_batch_drain_iterations: u32,
pub(crate) currently_firing: Vec<NodeId>,
pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
pub(crate) binding: Arc<dyn BindingBoundary>,
}
impl Drop for CoreShared {
fn drop(&mut self) {
let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
std::mem::take(&mut self.pending_scratch_release);
for mut scratch in queued {
scratch.release_handles(&*self.binding);
}
}
}
pub struct CoreState {
pub(crate) nodes: HashMap<NodeId, NodeRecord>,
pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
pub(crate) binding: Arc<dyn BindingBoundary>,
}
static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
pub struct Core {
pub(crate) shared: std::cell::RefCell<CoreShared>,
pub(crate) state: std::cell::RefCell<CoreState>,
pub(crate) mailbox: Arc<crate::mailbox::CoreMailbox>,
pub(crate) deferred: std::rc::Rc<crate::mailbox::DeferQueue>,
pub(crate) binding: Arc<dyn BindingBoundary>,
pub(crate) generation: u64,
}
impl Drop for Core {
fn drop(&mut self) {
self.mailbox.close();
self.deferred.close();
for op in self.mailbox.take_all() {
match op {
crate::mailbox::MailboxOp::Emit(_, h) | crate::mailbox::MailboxOp::Error(_, h) => {
self.binding.release_handle(h);
}
crate::mailbox::MailboxOp::Complete(_) | crate::mailbox::MailboxOp::Defer(_) => {}
}
}
}
}
pub trait CoreFull {
fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError>;
fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError>;
fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId;
fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError>;
fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId);
fn emit(&self, node_id: NodeId, handle: HandleId);
fn complete(&self, node_id: NodeId);
fn error(&self, node_id: NodeId, handle: HandleId);
fn teardown(&self, node_id: NodeId);
fn invalidate(&self, node_id: NodeId);
fn cache_of(&self, node_id: NodeId) -> HandleId;
fn has_fired_once(&self, node_id: NodeId) -> bool;
fn kind_of(&self, node_id: NodeId) -> Option<NodeKind>;
fn deps_of(&self, node_id: NodeId) -> Vec<NodeId>;
fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind>;
fn is_dirty(&self, node_id: NodeId) -> bool;
fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value>;
fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox>;
fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue>;
fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary>;
fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId);
fn complete_or_defer(&self, node_id: NodeId);
fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId);
}
impl CoreFull for Core {
#[inline]
fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError> {
Core::register_state(self, initial, partial)
}
#[inline]
fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
Core::register_producer(self, fn_id)
}
#[inline]
fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
Core::subscribe(self, node_id, sink)
}
#[inline]
fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError> {
Core::try_subscribe(self, node_id, sink)
}
#[inline]
fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
Core::unsubscribe(self, node_id, sub_id);
}
#[inline]
fn emit(&self, node_id: NodeId, handle: HandleId) {
Core::emit(self, node_id, handle);
}
#[inline]
fn complete(&self, node_id: NodeId) {
Core::complete(self, node_id);
}
#[inline]
fn error(&self, node_id: NodeId, handle: HandleId) {
Core::error(self, node_id, handle);
}
#[inline]
fn teardown(&self, node_id: NodeId) {
Core::teardown(self, node_id);
}
#[inline]
fn invalidate(&self, node_id: NodeId) {
Core::invalidate(self, node_id);
}
#[inline]
fn cache_of(&self, node_id: NodeId) -> HandleId {
Core::cache_of(self, node_id)
}
#[inline]
fn has_fired_once(&self, node_id: NodeId) -> bool {
Core::has_fired_once(self, node_id)
}
#[inline]
fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
Core::kind_of(self, node_id)
}
#[inline]
fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
Core::deps_of(self, node_id)
}
#[inline]
fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
Core::is_terminal(self, node_id)
}
#[inline]
fn is_dirty(&self, node_id: NodeId) -> bool {
Core::is_dirty(self, node_id)
}
#[inline]
fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value> {
self.binding_ptr().serialize_handle(handle)
}
#[inline]
fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
Core::mailbox(self)
}
#[inline]
fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
Core::defer_queue(self)
}
#[inline]
fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary> {
Core::binding(self)
}
#[inline]
fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
if self.try_emit(node_id, new_handle).is_err() {
self.binding.retain_handle(new_handle);
self.push_deferred_producer_op(DeferredProducerOp::Emit {
node_id,
handle: new_handle,
});
}
}
#[inline]
fn complete_or_defer(&self, node_id: NodeId) {
if self.try_complete(node_id).is_err() {
self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
}
}
#[inline]
fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
if self.try_error(node_id, error_handle).is_err() {
self.binding.retain_handle(error_handle);
self.push_deferred_producer_op(DeferredProducerOp::Error {
node_id,
handle: error_handle,
});
}
}
}
struct ScratchReleaseGuard<'a> {
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
}
impl<'a> ScratchReleaseGuard<'a> {
fn new(
scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
binding: &'a dyn BindingBoundary,
) -> Self {
Self { scratch, binding }
}
fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
self.scratch.take()
}
}
impl Drop for ScratchReleaseGuard<'_> {
fn drop(&mut self) {
if let Some(mut scratch) = self.scratch.take() {
scratch.release_handles(self.binding);
}
}
}
pub(crate) struct St<'a> {
state: std::cell::RefMut<'a, CoreState>,
pub(crate) shared: std::cell::RefMut<'a, CoreShared>,
}
impl<'a> St<'a> {
#[inline]
pub(crate) fn new(core: &'a Core) -> Self {
let state = core.state.borrow_mut();
let shared = core.shared.borrow_mut();
Self { state, shared }
}
pub(crate) fn alloc_node_id(&mut self) -> NodeId {
let id = NodeId::new(self.shared.next_node_id);
self.shared.next_node_id += 1;
id
}
pub(crate) fn alloc_sub_id(&mut self) -> SubscriptionId {
let id = SubscriptionId(self.shared.next_subscription_id);
self.shared.next_subscription_id += 1;
id
}
}
impl std::ops::Deref for St<'_> {
type Target = CoreState;
#[inline]
fn deref(&self) -> &CoreState {
&self.state
}
}
impl std::ops::DerefMut for St<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut CoreState {
&mut self.state
}
}
impl Core {
#[must_use]
pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
Self {
shared: std::cell::RefCell::new(CoreShared {
next_node_id: 1,
next_subscription_id: 1,
next_lock_id: 1u64 << 31,
currently_firing: Vec::new(),
pause_buffer_cap: None,
max_batch_drain_iterations: 10_000,
topology_sinks: HashMap::new(),
next_topology_id: 1,
pending_scratch_release: Vec::new(),
binding: binding.clone(),
}),
state: std::cell::RefCell::new(CoreState {
nodes: HashMap::new(),
children: HashMap::new(),
binding: binding.clone(),
}),
mailbox: Arc::new(crate::mailbox::CoreMailbox::new()),
deferred: std::rc::Rc::new(crate::mailbox::DeferQueue::new()),
binding,
generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
}
}
pub(crate) fn lock_state(&self) -> St<'_> {
St::new(self)
}
pub(crate) fn with_shared<R>(&self, f: impl FnOnce(&mut CoreShared) -> R) -> R {
f(&mut self.shared.borrow_mut())
}
#[allow(dead_code)]
pub(crate) fn with_shard<R>(&self, f: impl FnOnce(&mut crate::node::CoreState) -> R) -> R {
f(&mut self.state.borrow_mut())
}
#[must_use]
pub fn same_dispatcher(&self, other: &Core) -> bool {
self.generation == other.generation
}
pub fn drain_mailbox(&self) {
let mailbox = Arc::clone(&self.mailbox);
let deferred = std::rc::Rc::clone(&self.deferred);
let max_ops = self.with_shared(|sh| sh.max_batch_drain_iterations);
let mut rounds = 0u32;
loop {
mailbox.drain_into(max_ops, |op| match op {
crate::mailbox::MailboxOp::Emit(node_id, handle) => self.emit(node_id, handle),
crate::mailbox::MailboxOp::Complete(node_id) => self.complete(node_id),
crate::mailbox::MailboxOp::Error(node_id, handle) => self.error(node_id, handle),
crate::mailbox::MailboxOp::Defer(f) => f(self),
});
deferred.drain_into(max_ops, |f| f(self));
if !mailbox.is_runnable() && !deferred.is_runnable() {
break;
}
rounds += 1;
assert!(
rounds < max_ops,
"drain_mailbox: id-mailbox / defer-queue mutual re-post \
livelock (> {max_ops} rounds) — a Defer/Emit pair is \
re-posting across the two queues every round. Tune via \
Core::set_max_batch_drain_iterations only with concrete \
evidence the workload needs more."
);
}
}
pub fn post_defer(&self, f: crate::mailbox::DeferFn) -> bool {
self.deferred.post(f)
}
#[must_use]
pub fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
std::rc::Rc::clone(&self.deferred)
}
#[must_use]
pub fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
Arc::clone(&self.mailbox)
}
#[must_use]
pub fn binding(&self) -> Arc<dyn BindingBoundary> {
Arc::clone(&self.binding)
}
pub fn push_deferred_producer_op(&self, op: DeferredProducerOp) {
match op {
DeferredProducerOp::Emit { node_id, handle } => {
self.emit(node_id, handle);
self.binding.release_handle(handle);
}
DeferredProducerOp::Complete { node_id } => {
self.complete(node_id);
}
DeferredProducerOp::Error { node_id, handle } => {
self.error(node_id, handle);
self.binding.release_handle(handle);
}
DeferredProducerOp::Callback(f) => {
f();
}
}
}
#[inline]
pub(crate) fn drain_deferred_producer_ops(&self) {}
}
impl Core {
#[doc(hidden)]
#[must_use]
pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
crate::batch::with_wave_state(|ws| {
ws.pending_notify
.get(&node)
.map(|entry| entry.batches.len())
})
}
pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
self.lock_state().shared.pause_buffer_cap = cap;
}
pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
let cap = match cap {
Some(0) => None,
other => other,
};
let to_release: Vec<HandleId> = {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
rec.replay_buffer_cap = cap;
match cap {
None => rec.replay_buffer.drain(..).collect(),
Some(c) => {
let mut drained = Vec::new();
while rec.replay_buffer.len() > c {
if let Some(h) = rec.replay_buffer.pop_front() {
drained.push(h);
}
}
drained
}
}
};
for h in to_release {
self.binding.release_handle(h);
}
}
pub fn set_pausable_mode(
&self,
node_id: NodeId,
mode: PausableMode,
) -> Result<(), SetPausableModeError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(SetPausableModeError::UnknownNode(node_id))?;
if rec.pause_state.is_paused() {
return Err(SetPausableModeError::WhilePaused);
}
rec.pausable = mode;
Ok(())
}
pub fn set_max_batch_drain_iterations(&self, cap: u32) {
assert!(cap > 0, "max_batch_drain_iterations must be > 0");
self.lock_state().shared.max_batch_drain_iterations = cap;
}
pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
let dep_ids: Vec<NodeId> = {
let s = self.lock_state();
let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
rec.dep_ids_vec()
};
let tier = message.tier();
if tier == 3 || tier == 5 {
return Err(UpError::TierForbidden { tier });
}
for dep_id in dep_ids {
match message {
Message::Pause(lock) => {
let _ = self.pause(dep_id, lock);
}
Message::Resume(lock) => {
let _ = self.resume(dep_id, lock);
}
Message::Invalidate => {
self.invalidate(dep_id);
}
Message::Teardown => {
self.teardown(dep_id);
}
_ => {}
}
}
Ok(())
}
#[must_use]
pub fn alloc_lock_id(&self) -> LockId {
let mut s = self.lock_state();
let id = LockId::new(s.shared.next_lock_id);
s.shared.next_lock_id += 1;
id
}
#[must_use]
pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
&self.binding
}
#[allow(clippy::too_many_lines)]
pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
let NodeRegistration {
deps,
fn_or_op,
opts,
} = reg;
let NodeOpts {
initial,
equals,
partial,
is_dynamic,
pausable,
replay_buffer,
terminal_as_real_input,
} = opts;
let (fn_id, op) = match fn_or_op {
Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
None => (None, None),
};
let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
if op.is_some() && deps.is_empty() {
return Err(RegisterError::OperatorWithoutDeps);
}
if initial != NO_HANDLE && !is_state_shape {
return Err(RegisterError::InitialOnlyForStateNodes);
}
let scratch = match op {
Some(operator_op) => self.make_op_scratch(operator_op)?,
None => None,
};
let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
let mut s = self.lock_state();
for &dep in &deps {
if !s.nodes.contains_key(&dep) {
return Err(RegisterError::UnknownDep(dep));
}
}
for &dep in &deps {
let dep_rec = s.require_node(dep);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(RegisterError::TerminalDep(dep));
}
}
let installed_scratch = scratch_guard.take();
let id = s.alloc_node_id();
let tracked: HashSet<usize> = if op.is_some() {
(0..deps.len()).collect()
} else if is_dynamic {
HashSet::new()
} else if fn_id.is_some() && !deps.is_empty() {
(0..deps.len()).collect()
} else {
HashSet::new()
};
let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
let topo_rank = if deps.is_empty() {
0
} else {
deps.iter()
.filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
.max()
.unwrap_or(0)
.saturating_add(1)
};
let rec = NodeRecord {
dep_records,
fn_id,
op,
is_dynamic,
equals,
cache: initial,
has_fired_once: initial != NO_HANDLE,
subscribers: HashMap::new(),
subscribers_revision: 0,
tracked,
dirty: false,
involved_this_wave: false,
pause_state: PauseState::Active,
pausable,
replay_buffer_cap: replay_buffer,
replay_buffer: VecDeque::new(),
terminal: None,
has_received_teardown: false,
resubscribable: false,
meta_companions: Vec::new(),
partial,
terminal_as_real_input,
topo_rank,
received_mask: 0,
involved_mask: 0,
op_scratch: installed_scratch,
};
s.nodes.insert(id, rec);
s.children.insert(id, HashSet::new());
for &dep in &deps {
s.children.entry(dep).or_default().insert(id);
}
drop(s);
self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
Ok(id)
}
pub fn register_state(
&self,
initial: HandleId,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: None,
opts: NodeOpts {
initial,
partial,
..NodeOpts::default()
},
})
}
pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: Vec::new(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
partial: true,
..NodeOpts::default()
},
})
}
pub fn register_derived(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
..NodeOpts::default()
},
})
}
pub fn register_dynamic(
&self,
deps: &[NodeId],
fn_id: FnId,
equals: EqualsMode,
partial: bool,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
opts: NodeOpts {
equals,
partial,
is_dynamic: true,
..NodeOpts::default()
},
})
}
fn make_op_scratch(
&self,
op: OperatorOp,
) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
Self::make_op_scratch_with_binding(&*self.binding, op)
}
pub(crate) fn make_op_scratch_with_binding(
binding: &dyn BindingBoundary,
op: OperatorOp,
) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
use crate::op_state::{
DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
TakeWhileState,
};
match op {
OperatorOp::Scan { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ScanState { acc: seed });
binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::Reduce { seed, .. } => {
if seed == NO_HANDLE {
return Err(RegisterError::OperatorSeedSentinel);
}
let state = Box::new(ReduceState { acc: seed });
binding.retain_handle(seed);
Ok(Some(state))
}
OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
OperatorOp::Last { default } => {
let state = Box::new(LastState {
latest: NO_HANDLE,
default,
});
if default != NO_HANDLE {
binding.retain_handle(default);
}
Ok(Some(state))
}
OperatorOp::TapFirst { .. } => {
Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
}
OperatorOp::Settle { .. } => {
Ok(Some(Box::new(crate::op_state::SettleState::default())))
}
OperatorOp::Map { .. }
| OperatorOp::Filter { .. }
| OperatorOp::Combine { .. }
| OperatorOp::WithLatestFrom { .. }
| OperatorOp::Merge
| OperatorOp::Tap { .. }
| OperatorOp::Valve => Ok(None),
}
}
pub fn register_operator(
&self,
deps: &[NodeId],
op: OperatorOp,
opts: OperatorOpts,
) -> Result<NodeId, RegisterError> {
self.register(NodeRegistration {
deps: deps.to_vec(),
fn_or_op: Some(NodeFnOrOp::Op(op)),
opts: NodeOpts {
equals: opts.equals,
partial: opts.partial,
..NodeOpts::default()
},
})
}
#[allow(clippy::needless_pass_by_value)] pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
match self.try_subscribe(node_id, sink) {
Ok(sub) => sub,
Err(e) => panic!("{e}"),
}
}
pub fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
unsubscribe_sink(self, node_id, sub_id);
}
#[allow(clippy::needless_pass_by_value)]
pub fn try_subscribe(
&self,
node_id: NodeId,
sink: Sink,
) -> Result<SubscriptionId, SubscribeError> {
let (sub_id, tier_slices, needs_activation, did_reset) = {
let mut s = self.lock_state();
let should_reject = {
let rec = s.require_node(node_id);
!rec.resubscribable && rec.terminal.is_some()
};
if should_reject {
drop(s);
return Err(SubscribeError::TornDown { node: node_id });
}
let sub_id = s.alloc_sub_id();
let needs_reset = {
let rec = s.require_node(node_id);
rec.resubscribable && rec.terminal.is_some()
};
if needs_reset {
self.reset_for_fresh_lifecycle(&mut s, node_id);
}
let (cache, is_state, first_subscriber) = {
let rec = s.require_node(node_id);
debug_assert!(
rec.terminal.is_none(),
"R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
);
debug_assert!(
!rec.has_received_teardown,
"R2.2.7.a invariant: reset clears has_received_teardown"
);
(rec.cache, rec.is_state(), rec.subscribers.is_empty())
};
let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
tier_slices.push(vec![Message::Start]);
if cache != NO_HANDLE {
tier_slices.push(vec![Message::Data(cache)]);
}
let replay_handles: Vec<HandleId> = {
let rec = s.require_node(node_id);
let cap = rec.replay_buffer_cap.unwrap_or(0);
if cap == 0 {
Vec::new()
} else {
let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
if cache != NO_HANDLE && v.last() == Some(&cache) {
v.pop();
}
v
}
};
for h in &replay_handles {
tier_slices.push(vec![Message::Data(*h)]);
}
{
let rec = s.require_node_mut(node_id);
rec.subscribers.insert(sub_id, sink.clone());
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
let needs_activation = first_subscriber && !is_state;
(sub_id, tier_slices, needs_activation, needs_reset)
};
if did_reset {
self.binding.wipe_ctx(node_id);
}
for slice in &tier_slices {
let sink_clone = sink.clone();
let slice_ref: &[Message] = slice;
let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
if let Err(panic_payload) = result {
{
let mut s = self.lock_state();
if let Some(rec) = s.nodes.get_mut(&node_id) {
rec.subscribers.remove(&sub_id);
rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
}
}
std::panic::resume_unwind(panic_payload);
}
}
if self.mailbox.is_runnable() || self.deferred.is_runnable() {
let _drain = self.begin_batch_for(node_id);
}
if needs_activation {
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.activate_derived(&mut s, node_id);
});
}
self.drain_deferred_producer_ops();
Ok(sub_id)
}
pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
let mut s = self.lock_state();
let rec = s.require_node_mut(node_id);
assert!(
rec.subscribers.is_empty(),
"set_resubscribable: node already has subscribers; \
configure resubscribable before any subscribe call"
);
rec.resubscribable = resubscribable;
}
fn reset_for_fresh_lifecycle(&self, s: &mut St<'_>, node_id: NodeId) {
let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
let rec = s.require_node_mut(node_id);
let mut hs = Vec::new();
if let Some(TerminalKind::Error(h)) = rec.terminal {
hs.push(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
hs.push(h);
}
for &h in &dr.data_batch {
hs.push(h);
}
if dr.prev_data != NO_HANDLE {
hs.push(dr.prev_data);
}
}
let mut pulled = Vec::new();
if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
for msg in buffer.drain(..) {
if let Some(h) = msg.payload_handle() {
pulled.push(h);
}
}
}
for h in rec.replay_buffer.drain(..) {
pulled.push(h);
}
rec.terminal = None;
rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
rec.has_received_teardown = false;
for dr in &mut rec.dep_records {
dr.prev_data = NO_HANDLE;
dr.data_batch.clear();
dr.terminal = None;
dr.dirty = false;
dr.involved_this_wave = false;
}
rec.pause_state = PauseState::Active;
rec.involved_this_wave = false;
rec.dirty = false;
rec.received_mask = 0;
rec.involved_mask = 0;
if rec.is_dynamic {
rec.tracked.clear();
}
let prev_op = rec.op;
let old = std::mem::take(&mut rec.op_scratch);
(prev_op, old, hs, pulled)
};
let new_scratch = match prev_op {
Some(op) => self
.make_op_scratch(op)
.expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
None => None,
};
if let Some(scratch) = old_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
drop(old_scratch);
let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
std::mem::take(&mut s.shared.pending_scratch_release);
for mut scratch in queued {
scratch.release_handles(&*self.binding);
}
{
let rec = s.require_node_mut(node_id);
rec.op_scratch = new_scratch;
}
for h in handles_to_release {
self.binding.release_handle(h);
}
for h in pause_buffer_payloads {
self.binding.release_handle(h);
}
}
pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut order: Vec<NodeId> = Vec::new();
let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
while let Some((id, finalize)) = stack.pop() {
if finalize {
order.push(id);
continue;
}
if !visited.insert(id) {
continue;
}
stack.push((id, true));
let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
for dep_id in dep_ids {
let (dep_is_state, dep_cache, dep_has_fired) = {
let dep_rec = s.require_node(dep_id);
(dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
};
if !dep_is_state
&& dep_cache == NO_HANDLE
&& !dep_has_fired
&& !visited.contains(&dep_id)
{
stack.push((dep_id, false));
}
}
}
for &id in &order {
let (dep_ids, is_producer) = {
let rec = s.require_node(id);
(rec.dep_ids_vec(), rec.is_producer())
};
if is_producer {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(id);
});
continue;
}
for (i, dep_id) in dep_ids.iter().copied().enumerate() {
let dep_cache = s.require_node(dep_id).cache;
if dep_cache != NO_HANDLE {
self.deliver_data_to_consumer(s, id, i, dep_cache);
}
}
}
}
pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
match self.try_emit(node_id, new_handle) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_emit(
&self,
node_id: NodeId,
new_handle: HandleId,
) -> Result<(), PartitionOrderViolation> {
assert!(
new_handle != NO_HANDLE,
"NO_HANDLE is not a valid DATA payload (R1.2.4)"
);
{
let s = self.lock_state();
let rec = s.require_node(node_id);
assert!(
rec.is_state() || rec.is_producer(),
"emit() is for state or producer nodes only; \
derived/dynamic/operator emit via their fn return value"
);
if rec.terminal.is_some() {
drop(s);
self.binding.release_handle(new_handle);
return Ok(());
}
}
self.try_run_wave_for(node_id, |this| {
this.commit_emission(node_id, new_handle);
})?;
Ok(())
}
pub fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
if self.try_emit(node_id, new_handle).is_err() {
self.binding.retain_handle(new_handle);
self.push_deferred_producer_op(DeferredProducerOp::Emit {
node_id,
handle: new_handle,
});
}
}
#[must_use]
pub fn cache_of(&self, node_id: NodeId) -> HandleId {
self.lock_state().require_node(node_id).cache
}
#[must_use]
pub fn has_fired_once(&self, node_id: NodeId) -> bool {
self.lock_state().require_node(node_id).has_fired_once
}
#[must_use]
pub fn node_ids(&self) -> Vec<NodeId> {
self.lock_state().nodes.keys().copied().collect()
}
#[must_use]
pub fn node_count(&self) -> usize {
self.lock_state().nodes.len()
}
#[must_use]
pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
}
#[must_use]
pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&node_id)
.map(NodeRecord::dep_ids_vec)
.unwrap_or_default()
}
#[must_use]
pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
self.lock_state()
.nodes
.get(&node_id)
.and_then(|r| r.terminal)
}
#[must_use]
pub fn is_dirty(&self, node_id: NodeId) -> bool {
self.lock_state()
.nodes
.get(&node_id)
.is_some_and(|r| r.dirty)
}
#[must_use]
pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
self.lock_state()
.nodes
.get(&parent)
.map(|r| r.meta_companions.clone())
.unwrap_or_default()
}
}
impl Core {
pub fn complete(&self, node_id: NodeId) {
match self.try_complete(node_id) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_complete(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
self.try_emit_terminal(node_id, TerminalKind::Complete)
}
pub fn complete_or_defer(&self, node_id: NodeId) {
match self.try_complete(node_id) {
Ok(()) => {}
Err(_) => {
self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
}
}
}
pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
match self.try_error(node_id, error_handle) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub(crate) fn try_error(
&self,
node_id: NodeId,
error_handle: HandleId,
) -> Result<(), PartitionOrderViolation> {
assert!(
error_handle != NO_HANDLE,
"NO_HANDLE is not a valid ERROR payload (R1.2.5)"
);
self.try_emit_terminal(node_id, TerminalKind::Error(error_handle))?;
self.binding.release_handle(error_handle);
Ok(())
}
pub fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
if self.try_error(node_id, error_handle).is_err() {
self.binding.retain_handle(error_handle);
self.push_deferred_producer_op(DeferredProducerOp::Error {
node_id,
handle: error_handle,
});
}
}
fn try_emit_terminal(
&self,
node_id: NodeId,
terminal: TerminalKind,
) -> Result<(), PartitionOrderViolation> {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.try_run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.terminate_node(&mut s, node_id, terminal);
})
}
fn terminate_node(&self, s: &mut St<'_>, node_id: NodeId, terminal: TerminalKind) {
let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
while let Some((id, t)) = work.pop() {
if s.require_node(id).terminal.is_some() {
continue; }
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let queue_wipe = {
let rec = s.require_node(id);
rec.resubscribable && rec.subscribers.is_empty()
};
s.require_node_mut(id).terminal = Some(t);
crate::batch::with_wave_state(|ws| {
if queue_wipe {
ws.pending_wipes.push(id);
}
ws.pending_fires.remove(&id);
});
let drained: Vec<HandleId> = {
let rec = s.require_node_mut(id);
let mut drained: Vec<HandleId> = Vec::new();
if rec.pause_state.is_paused() {
let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
if let PauseState::Paused { buffer, .. } = prev {
drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
}
}
drained.extend(rec.replay_buffer.drain(..));
drained
};
for h in drained {
self.binding.release_handle(h);
}
let msg = match t {
TerminalKind::Complete => Message::Complete,
TerminalKind::Error(h) => Message::Error(h),
};
self.queue_notify(s, id, msg);
let child_ids: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(id);
let Some(idx) = dep_idx else { continue };
{
let child = s.require_node_mut(child_id);
if child.dep_records[idx].terminal.is_some() {
continue;
}
child.dep_records[idx].terminal = Some(t);
}
if let TerminalKind::Error(h) = t {
self.binding.retain_handle(h);
}
let action = {
let child = s.require_node(child_id);
if child.terminal.is_some() {
ChildAction::None } else if child.all_deps_terminal() {
if child.skips_auto_cascade() {
ChildAction::QueueFire
} else {
ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
}
} else {
ChildAction::None
}
};
match action {
ChildAction::None => {}
ChildAction::Cascade(t_child) => {
work.push((child_id, t_child));
}
ChildAction::QueueFire => {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(child_id);
});
}
}
}
}
}
}
enum ChildAction {
None,
Cascade(TerminalKind),
QueueFire,
}
fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
for dr in dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
return TerminalKind::Error(h);
}
}
TerminalKind::Complete
}
impl Core {
pub fn teardown(&self, node_id: NodeId) {
match self.try_teardown(node_id) {
Ok(()) => {}
Err(e) => panic!("{e}"),
}
}
pub fn teardown_or_defer(&self, node_id: NodeId) {
match self.try_teardown(node_id) {
Ok(()) => {}
Err(_) => {
self.teardown(node_id);
}
}
}
fn try_teardown(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
let torn_down_for_wave = torn_down.clone();
self.try_run_wave_for(node_id, move |this| {
let mut s = this.lock_state();
let collected = this.teardown_inner(&mut s, node_id);
torn_down_for_wave.lock().extend(collected);
})?;
let ids = std::mem::take(&mut *torn_down.lock());
for id in ids {
self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
}
Ok(())
}
fn teardown_inner(&self, s: &mut St<'_>, root: NodeId) -> Vec<NodeId> {
enum Action {
Visit(NodeId),
EmitTeardown(NodeId),
}
let mut stack: Vec<Action> = vec![Action::Visit(root)];
let mut torn_down: Vec<NodeId> = Vec::new();
while let Some(action) = stack.pop() {
match action {
Action::Visit(id) => {
if s.require_node(id).has_received_teardown {
continue; }
s.require_node_mut(id).has_received_teardown = true;
let children: Vec<NodeId> = s
.children
.get(&id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for &child in children.iter().rev() {
stack.push(Action::Visit(child));
}
stack.push(Action::EmitTeardown(id));
let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
for &meta in metas.iter().rev() {
stack.push(Action::Visit(meta));
}
}
Action::EmitTeardown(id) => {
let already_terminal = s.require_node(id).terminal.is_some();
if !already_terminal {
self.terminate_node(s, id, TerminalKind::Complete);
}
self.queue_notify(s, id, Message::Teardown);
torn_down.push(id);
}
}
}
torn_down
}
pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
assert!(parent != companion, "node cannot be its own meta companion");
let mut s = self.lock_state();
assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
assert!(
s.nodes.contains_key(&companion),
"unknown companion {companion:?}"
);
let metas = &mut s.require_node_mut(parent).meta_companions;
if metas.contains(&companion) {
return;
}
metas.push(companion);
}
}
impl Core {
pub fn invalidate(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
self.run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.invalidate_inner(&mut s, node_id);
});
}
pub fn invalidate_or_defer(&self, node_id: NodeId) {
{
let s = self.lock_state();
assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
}
let result = self.try_run_wave_for(node_id, |this| {
let mut s = this.lock_state();
this.invalidate_inner(&mut s, node_id);
});
if result.is_err() {
self.invalidate(node_id);
}
}
fn invalidate_inner(&self, s: &mut St<'_>, root: NodeId) {
let mut work: Vec<NodeId> = vec![root];
while let Some(node_id) = work.pop() {
let old_handle = s.require_node(node_id).cache;
if old_handle == NO_HANDLE {
continue;
}
s.require_node_mut(node_id).cache = NO_HANDLE;
self.binding.release_handle(old_handle);
crate::batch::with_wave_state(|ws| {
if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
ws.deferred_cleanup_hooks
.push((node_id, CleanupTrigger::OnInvalidate));
}
});
self.queue_notify(s, node_id, Message::Invalidate);
let child_ids: Vec<NodeId> = s
.children
.get(&node_id)
.map(|c| c.iter().copied().collect())
.unwrap_or_default();
for child_id in child_ids {
let dep_idx = s.require_node(child_id).dep_index_of(node_id);
if let Some(idx) = dep_idx {
let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
let dr = &s.require_node(child_id).dep_records[idx];
(dr.prev_data, dr.data_batch.clone())
};
{
crate::batch::with_wave_state(|ws| {
if old_prev != NO_HANDLE {
ws.deferred_handle_releases.push(old_prev);
}
for h in batch_hs {
ws.deferred_handle_releases.push(h);
}
});
}
let child_rec = s.require_node_mut(child_id);
child_rec.dep_records[idx].prev_data = NO_HANDLE;
child_rec.dep_records[idx].data_batch.clear();
if idx < 64 {
child_rec.received_mask &= !(1u64 << idx);
}
work.push(child_id);
}
}
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ResumeReport {
pub replayed: u32,
pub dropped: u32,
}
impl Core {
pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.terminal.is_some() {
return Ok(());
}
if rec.pausable == PausableMode::Off {
return Ok(());
}
rec.pause_state.add_lock(lock_id);
Ok(())
}
pub fn resume(
&self,
node_id: NodeId,
lock_id: LockId,
) -> Result<Option<ResumeReport>, PauseError> {
let (sinks, messages, dropped, pending_wave_for_default) = {
let mut s = self.lock_state();
let rec = s
.nodes
.get_mut(&node_id)
.ok_or(PauseError::UnknownNode(node_id))?;
if rec.pausable == PausableMode::Off {
return Ok(None);
}
let was_default_mode = rec.pausable == PausableMode::Default;
let pending_wave = if was_default_mode {
rec.pause_state.take_pending_wave()
} else {
false
};
let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
if pending_wave {
rec.pause_state.mark_pending_wave();
}
return Ok(None);
};
let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
let messages: Vec<Message> = buffer.into_iter().collect();
if pending_wave && was_default_mode {
crate::batch::with_wave_state(|ws| {
ws.pending_fires.insert(node_id);
});
}
(sinks, messages, dropped, pending_wave && was_default_mode)
};
let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
if !messages.is_empty() {
for sink in &sinks {
sink(&messages);
}
for msg in &messages {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
if pending_wave_for_default {
self.run_wave_for(node_id, |_this| {
});
}
Ok(Some(ResumeReport { replayed, dropped }))
}
#[must_use]
pub fn is_paused(&self, node_id: NodeId) -> bool {
self.lock_state()
.require_node(node_id)
.pause_state
.is_paused()
}
#[must_use]
pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
self.lock_state()
.require_node(node_id)
.pause_state
.lock_count()
}
#[must_use]
pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
self.lock_state()
.require_node(node_id)
.pause_state
.contains_lock(lock_id)
}
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum SetDepsError {
#[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
SelfDependency { n: NodeId },
#[error(
"set_deps({n:?}, ...): cycle would form via path {path:?} \
(adding {added_dep:?} → {n:?} closes the loop)"
)]
WouldCreateCycle {
n: NodeId,
added_dep: NodeId,
path: Vec<NodeId>,
},
#[error("set_deps: unknown node {0:?}")]
UnknownNode(NodeId),
#[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
NotComputeNode(NodeId),
#[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
TerminalNode { n: NodeId },
#[error(
"set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
either mark it resubscribable before terminate, or remove the dep from new_deps"
)]
TerminalDep { n: NodeId, dep: NodeId },
#[error(
"set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
(set_deps from inside the firing node's own fn would corrupt the \
Dynamic `tracked` indices snapshot taken before invoke_fn). \
Schedule the rewire outside this fire scope."
)]
ReentrantOnFiringNode { n: NodeId },
}
impl Core {
#[allow(clippy::too_many_lines)]
pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
let mut s = self.lock_state();
let (is_state, is_producer, is_terminal) = {
let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
(rec.is_state(), rec.is_producer(), rec.terminal.is_some())
};
if is_state || is_producer {
return Err(SetDepsError::NotComputeNode(n));
}
if is_terminal {
return Err(SetDepsError::TerminalNode { n });
}
if s.shared.currently_firing.contains(&n) {
return Err(SetDepsError::ReentrantOnFiringNode { n });
}
if new_deps.contains(&n) {
return Err(SetDepsError::SelfDependency { n });
}
for &d in new_deps {
if !s.nodes.contains_key(&d) {
return Err(SetDepsError::UnknownNode(d));
}
}
let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
for &d in &added {
if let Some(path) = self.path_from_to(&s, n, d) {
return Err(SetDepsError::WouldCreateCycle {
n,
added_dep: d,
path,
});
}
}
for &d in &added {
let dep_rec = s.require_node(d);
if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
return Err(SetDepsError::TerminalDep { n, dep: d });
}
}
let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
if added.is_empty() && removed.is_empty() {
return Ok(());
}
let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
let rec = s.require_node(n);
let old_by_node: HashMap<NodeId, &DepRecord> =
rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
let new_records: Vec<DepRecord> = new_deps_vec
.iter()
.map(|&d| {
if let Some(old) = old_by_node.get(&d) {
DepRecord {
node: d,
prev_data: old.prev_data,
dirty: old.dirty,
involved_this_wave: old.involved_this_wave,
data_batch: old.data_batch.clone(),
terminal: old.terminal,
}
} else {
DepRecord::new(d)
}
})
.collect();
let mut to_release: Vec<HandleId> = Vec::new();
for d in &removed {
if let Some(old) = old_by_node.get(d) {
if let Some(TerminalKind::Error(h)) = old.terminal {
to_release.push(h);
}
for &h in &old.data_batch {
to_release.push(h);
}
}
}
(new_records, to_release)
};
let new_topo_rank = if new_deps_vec.is_empty() {
0
} else {
new_deps_vec
.iter()
.filter(|&&d| d != n)
.filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
.max()
.unwrap_or(0)
.saturating_add(1)
};
let fire_set_deps_on_rerun;
{
let rec = s.require_node_mut(n);
fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
rec.dep_records = new_dep_records;
rec.topo_rank = new_topo_rank;
rec.received_mask = 0;
rec.involved_mask = 0;
for (i, dr) in rec.dep_records.iter().enumerate() {
if i < 64 {
if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
rec.received_mask |= 1u64 << i;
}
if dr.involved_this_wave {
rec.involved_mask |= 1u64 << i;
}
}
}
if rec.is_dynamic {
rec.tracked.clear();
rec.has_fired_once = false;
} else {
rec.tracked = (0..new_deps_vec.len()).collect();
}
}
for &removed_dep in &removed {
if let Some(set) = s.children.get_mut(&removed_dep) {
set.remove(&n);
}
}
for &added_dep in &added {
s.children.entry(added_dep).or_default().insert(n);
}
let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
if new_deps_vec.is_empty() {
crate::batch::with_wave_state(|ws| {
let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
let mut unpaired: i32 = 0;
for m in entry.iter_messages() {
match m {
crate::message::Message::Dirty => unpaired += 1,
crate::message::Message::Data(_)
| crate::message::Message::Resolved
| crate::message::Message::Complete
| crate::message::Message::Error(_)
if unpaired > 0 =>
{
unpaired -= 1;
}
_ => {}
}
}
unpaired > 0
});
if needs_auto_resolve {
ws.pending_auto_resolve.insert(n);
}
});
}
drop(s);
if fire_set_deps_on_rerun {
self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
}
self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
node: n,
old_deps: old_deps_vec,
new_deps: new_deps_vec.clone(),
});
if !added_for_wave.is_empty() {
self.run_wave_for(n, |this| {
let mut s = this.lock_state();
if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
return;
}
for added_dep in &added_for_wave {
let cache = match s.nodes.get(added_dep) {
Some(rec) => rec.cache,
None => continue, };
if cache == NO_HANDLE {
continue;
}
let dep_idx = s.require_node(n).dep_index_of(*added_dep);
if let Some(idx) = dep_idx {
this.deliver_data_to_consumer(&mut s, n, idx, cache);
}
}
});
}
for h in removed_handles {
self.binding.release_handle(h);
}
Ok(())
}
fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
if from == to {
return Some(vec![from]);
}
let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
let mut visited: HashSet<NodeId> = HashSet::new();
while let Some((cur, path)) = stack.pop() {
if !visited.insert(cur) {
continue;
}
if cur == to {
return Some(path);
}
if let Some(children) = s.children.get(&cur) {
for &child in children {
let mut new_path = path.clone();
new_path.push(child);
stack.push((child, new_path));
}
}
}
None
}
}
impl CoreState {
pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
for rec in self.nodes.values_mut() {
rec.dirty = false;
rec.involved_this_wave = false;
rec.involved_mask = 0;
for dr in &mut rec.dep_records {
let batch_len = dr.data_batch.len();
if batch_len > 0 {
for &h in &dr.data_batch[..batch_len - 1] {
ws.deferred_handle_releases.push(h);
}
if dr.prev_data != NO_HANDLE {
ws.deferred_handle_releases.push(dr.prev_data);
}
dr.prev_data = dr.data_batch[batch_len - 1];
dr.data_batch.clear();
}
dr.dirty = false;
dr.involved_this_wave = false;
}
}
}
pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
self.nodes
.get(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
self.nodes
.get_mut(&id)
.unwrap_or_else(|| panic!("unknown node {id:?}"))
}
}
impl Drop for CoreState {
fn drop(&mut self) {
for rec in self.nodes.values_mut() {
if rec.cache != NO_HANDLE {
self.binding.release_handle(rec.cache);
}
if let Some(TerminalKind::Error(h)) = rec.terminal {
self.binding.release_handle(h);
}
for dr in &rec.dep_records {
if let Some(TerminalKind::Error(h)) = dr.terminal {
self.binding.release_handle(h);
}
for &h in &dr.data_batch {
self.binding.release_handle(h);
}
if dr.prev_data != NO_HANDLE {
self.binding.release_handle(dr.prev_data);
}
}
if let PauseState::Paused { buffer, .. } = &rec.pause_state {
for msg in buffer {
if let Some(h) = msg.payload_handle() {
self.binding.release_handle(h);
}
}
}
for &h in &rec.replay_buffer {
self.binding.release_handle(h);
}
if let Some(scratch) = rec.op_scratch.as_mut() {
scratch.release_handles(&*self.binding);
}
}
}
}