pub mod gc;
pub mod heap;
pub mod registry;
pub mod stack;
mod types;
pub use types::*;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use crate::atom::Atom;
use crate::mailbox::Mailbox;
use crate::module::Module;
use crate::namespace::NamespaceId;
use crate::native::NativeContinuation;
use crate::process::heap::Heap;
use crate::process::stack::Stack;
use crate::term::{Term, boxed::BoxedTag, compare};
pub const DEFAULT_REDUCTION_BUDGET: u32 = 4000;
#[derive(Debug)]
pub struct Process {
pid: u64,
capabilities: crate::native::CapabilitySet,
status: ProcessStatus,
priority: Priority,
heap: Heap,
virtual_binary_heap: usize,
stack: Stack,
mailbox: Mailbox,
handlers: Vec<ExceptionHandler>,
current_exception: Option<Exception>,
dictionary: Vec<(Term, Term)>,
receive_timeout: Option<ReceiveTimeout>,
receive_timer_ref: Option<u64>,
x_regs: [Term; 1024],
float_regs: [f64; 16],
native_continuation: Option<NativeContinuation>,
native_roots: Vec<Term>,
raw_stacktrace: Vec<RawStackEntry>,
reduction_counter: u32,
namespace_id: NamespaceId,
code_position: Option<CodePosition>,
current_module: Option<Arc<Module>>,
current_mfa: Option<(Atom, Atom, u8)>,
jit_runtime_context: Option<JitRuntimeContext>,
jit_status: Option<JitStatus>,
#[cfg(feature = "telemetry")]
receive_wait_started: Option<crate::telemetry::spans::ReceiveWaitStarted>,
links: Vec<u64>,
remote_links: Vec<RemotePid>,
monitors: Vec<Monitor>,
trap_exit: bool,
group_leader: Term,
not_send_sync: PhantomData<Rc<()>>,
}
impl Clone for Process {
fn clone(&self) -> Self {
let mut heap = self.heap.clone();
heap.rebase_snapshot_terms(&self.heap);
let mut clone = Self {
pid: self.pid,
capabilities: self.capabilities.clone(),
status: self.status,
priority: self.priority,
heap,
virtual_binary_heap: self.virtual_binary_heap,
stack: self.stack.clone(),
mailbox: self.mailbox.clone(),
handlers: self.handlers.clone(),
current_exception: self.current_exception,
dictionary: self.dictionary.clone(),
receive_timeout: self.receive_timeout,
receive_timer_ref: self.receive_timer_ref,
x_regs: self.x_regs,
float_regs: self.float_regs,
native_continuation: self.native_continuation.clone(),
native_roots: self.native_roots.clone(),
raw_stacktrace: self.raw_stacktrace.clone(),
reduction_counter: self.reduction_counter,
namespace_id: self.namespace_id,
code_position: self.code_position,
current_module: self.current_module.clone(),
current_mfa: self.current_mfa,
jit_runtime_context: self.jit_runtime_context,
jit_status: self.jit_status,
links: self.links.clone(),
remote_links: self.remote_links.clone(),
monitors: self.monitors.clone(),
trap_exit: self.trap_exit,
group_leader: self.group_leader,
not_send_sync: PhantomData,
};
clone.rebase_roots_from(self);
clone
}
}
impl Process {
#[must_use]
pub fn new(pid: u64, heap_size: usize) -> Self {
Self::with_capabilities(pid, heap_size, crate::native::CapabilitySet::all())
}
#[must_use]
pub fn with_capabilities(
pid: u64,
heap_size: usize,
capabilities: crate::native::CapabilitySet,
) -> Self {
Self {
pid,
capabilities,
status: ProcessStatus::New,
priority: Priority::Normal,
heap: Heap::new(heap_size),
virtual_binary_heap: 0,
stack: Stack::new(),
mailbox: Mailbox::new(),
handlers: Vec::new(),
current_exception: None,
dictionary: Vec::new(),
receive_timeout: None,
receive_timer_ref: None,
x_regs: [Term::NIL; 1024],
float_regs: [0.0; 16],
native_continuation: None,
native_roots: Vec::new(),
raw_stacktrace: Vec::new(),
reduction_counter: DEFAULT_REDUCTION_BUDGET,
namespace_id: NamespaceId::DEFAULT,
code_position: None,
current_module: None,
current_mfa: None,
jit_runtime_context: None,
jit_status: None,
#[cfg(feature = "telemetry")]
receive_wait_started: None,
links: Vec::new(),
remote_links: Vec::new(),
monitors: Vec::new(),
trap_exit: false,
group_leader: Self::initial_group_leader(pid),
not_send_sync: PhantomData,
}
}
#[must_use]
pub const fn pid(&self) -> u64 {
self.pid
}
const fn initial_group_leader(pid: u64) -> Term {
match Term::try_pid(pid) {
Some(pid_term) => pid_term,
None => Term::NIL,
}
}
#[must_use]
pub const fn capabilities(&self) -> &crate::native::CapabilitySet {
&self.capabilities
}
pub fn set_capabilities(&mut self, capabilities: crate::native::CapabilitySet) {
self.capabilities = capabilities;
}
#[must_use]
pub const fn status(&self) -> ProcessStatus {
self.status
}
#[must_use]
pub const fn priority(&self) -> Priority {
self.priority
}
pub const fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
}
pub fn transition_to(&mut self, next: ProcessStatus) -> Result<(), ProcessError> {
if Self::can_transition(self.status, next) {
self.status = next;
Ok(())
} else {
Err(ProcessError::InvalidStatusTransition {
from: self.status,
to: next,
})
}
}
const fn can_transition(from: ProcessStatus, to: ProcessStatus) -> bool {
matches!(
(from, to),
(ProcessStatus::New, ProcessStatus::Running)
| (ProcessStatus::Running, ProcessStatus::Yielded)
| (ProcessStatus::Running, ProcessStatus::Waiting)
| (ProcessStatus::Running, ProcessStatus::Suspended)
| (ProcessStatus::Running, ProcessStatus::Exited(_))
| (ProcessStatus::Yielded, ProcessStatus::Running)
| (ProcessStatus::Yielded, ProcessStatus::Suspended)
| (ProcessStatus::Waiting, ProcessStatus::Running)
| (ProcessStatus::Waiting, ProcessStatus::Suspended)
| (ProcessStatus::Suspended, ProcessStatus::Yielded)
| (ProcessStatus::Suspended, ProcessStatus::Waiting)
)
}
#[must_use]
pub const fn heap(&self) -> &Heap {
&self.heap
}
pub fn heap_mut(&mut self) -> &mut Heap {
&mut self.heap
}
#[must_use]
pub const fn virtual_binary_heap(&self) -> usize {
self.virtual_binary_heap
}
pub fn increase_virtual_binary_heap(&mut self, bytes: usize) {
self.virtual_binary_heap = self.virtual_binary_heap.saturating_add(bytes);
}
pub(crate) fn decrease_virtual_binary_heap(&mut self, bytes: usize) {
self.virtual_binary_heap = self.virtual_binary_heap.saturating_sub(bytes);
}
#[must_use]
pub const fn stack(&self) -> &Stack {
&self.stack
}
pub fn stack_mut(&mut self) -> &mut Stack {
&mut self.stack
}
#[must_use]
pub const fn mailbox(&self) -> &Mailbox {
&self.mailbox
}
pub fn mailbox_mut(&mut self) -> &mut Mailbox {
&mut self.mailbox
}
pub fn dict_put(&mut self, key: Term, value: Term) -> Term {
for (existing_key, existing_value) in &mut self.dictionary {
if compare::exact_eq(*existing_key, key) {
let old_value = *existing_value;
*existing_value = value;
return old_value;
}
}
self.dictionary.push((key, value));
Term::atom(Atom::UNDEFINED)
}
#[must_use]
pub fn dict_get(&self, key: Term) -> Term {
self.dictionary
.iter()
.find_map(|(existing_key, value)| {
compare::exact_eq(*existing_key, key).then_some(*value)
})
.unwrap_or_else(|| Term::atom(Atom::UNDEFINED))
}
#[must_use]
pub fn dict_get_all(&self) -> &[(Term, Term)] {
&self.dictionary
}
pub fn dict_erase(&mut self, key: Term) -> Term {
let Some(index) = self
.dictionary
.iter()
.position(|(existing_key, _)| compare::exact_eq(*existing_key, key))
else {
return Term::atom(Atom::UNDEFINED);
};
let (_key, value) = self.dictionary.swap_remove(index);
value
}
pub fn dict_erase_all(&mut self) -> Vec<(Term, Term)> {
std::mem::take(&mut self.dictionary)
}
#[must_use]
pub fn dict_get_keys(&self, value: Term) -> Vec<Term> {
self.dictionary
.iter()
.filter_map(|(key, existing_value)| {
compare::exact_eq(*existing_value, value).then_some(*key)
})
.collect()
}
fn rebase_roots_from(&mut self, original: &Self) {
for root in &mut self.x_regs {
*root = self.heap.rebase_term_from(*root, &original.heap);
}
for root in self.stack.y_regs_mut() {
*root = self.heap.rebase_term_from(*root, &original.heap);
}
for root in self.mailbox.scan_iter_mut() {
*root = self.heap.rebase_term_from(*root, &original.heap);
}
for entry in &mut self.raw_stacktrace {
entry.location_info = self
.heap
.rebase_term_from(entry.location_info, &original.heap);
}
if let Some(exception) = &mut self.current_exception {
exception.class = self.heap.rebase_term_from(exception.class, &original.heap);
exception.reason = self.heap.rebase_term_from(exception.reason, &original.heap);
exception.stacktrace = self
.heap
.rebase_term_from(exception.stacktrace, &original.heap);
}
for (key, value) in &mut self.dictionary {
*key = self.heap.rebase_term_from(*key, &original.heap);
*value = self.heap.rebase_term_from(*value, &original.heap);
}
self.group_leader = self
.heap
.rebase_term_from(self.group_leader, &original.heap);
}
pub(crate) fn roots(&mut self) -> Vec<Term> {
self.roots_with_live_x(256)
}
pub(crate) fn roots_with_live_x(&mut self, live_x: usize) -> Vec<Term> {
self.mailbox.drain_arrival();
let live_x = live_x.min(self.x_regs.len());
let exception_roots = self
.current_exception
.into_iter()
.flat_map(|exception| [exception.reason, exception.stacktrace]);
let mut roots: Vec<Term> = self
.x_regs
.iter()
.take(live_x)
.chain(self.stack.y_regs())
.chain(self.mailbox.scan_iter())
.copied()
.chain(exception_roots)
.chain(
self.dictionary
.iter()
.flat_map(|(key, value)| [*key, *value]),
)
.chain(std::iter::once(self.group_leader))
.collect();
roots.extend(self.native_roots.iter().copied());
if let Some(continuation) = &self.native_continuation {
continuation.for_each_term(&mut |term| roots.push(term));
}
roots
}
pub(crate) fn replace_roots(&mut self, roots: &[Term]) {
self.replace_roots_with_live_x(256, roots);
}
pub(crate) fn replace_roots_with_live_x(&mut self, live_x: usize, roots: &[Term]) {
let mut index = 0;
let live_x = live_x.min(self.x_regs.len());
for root in self.x_regs.iter_mut().take(live_x) {
if let Some(value) = roots.get(index).copied() {
*root = value;
}
index += 1;
}
for root in self.stack.y_regs_mut() {
if let Some(value) = roots.get(index).copied() {
*root = value;
}
index += 1;
}
for root in self.mailbox.scan_iter_mut() {
if let Some(value) = roots.get(index).copied() {
*root = value;
}
index += 1;
}
if let Some(exception) = &mut self.current_exception {
if let Some(value) = roots.get(index).copied() {
exception.reason = value;
}
index += 1;
if let Some(value) = roots.get(index).copied() {
exception.stacktrace = value;
}
index += 1;
}
for (key, value) in &mut self.dictionary {
if let Some(root) = roots.get(index).copied() {
*key = root;
}
index += 1;
if let Some(root) = roots.get(index).copied() {
*value = root;
}
index += 1;
}
if let Some(root) = roots.get(index).copied() {
self.group_leader = root;
}
index += 1;
for root in &mut self.native_roots {
if let Some(value) = roots.get(index).copied() {
*root = value;
}
index += 1;
}
if let Some(continuation) = &mut self.native_continuation {
continuation.for_each_term_mut(&mut |term| {
if let Some(value) = roots.get(index).copied() {
*term = value;
}
index += 1;
});
}
}
pub fn push_exception_handler(&mut self, handler: ExceptionHandler) {
self.handlers.push(handler);
}
pub fn pop_exception_handler(&mut self) -> Option<ExceptionHandler> {
self.handlers.pop()
}
#[must_use]
pub fn exception_handler_count(&self) -> usize {
self.handlers.len()
}
pub fn set_raw_stacktrace(&mut self, raw_stacktrace: Vec<RawStackEntry>) {
self.raw_stacktrace = raw_stacktrace;
}
pub fn clear_raw_stacktrace(&mut self) {
self.raw_stacktrace.clear();
}
#[must_use]
pub fn raw_stacktrace(&self) -> &[RawStackEntry] {
&self.raw_stacktrace
}
pub const fn set_current_exception(&mut self, exception: Option<Exception>) {
self.current_exception = exception;
}
#[must_use]
pub const fn current_exception(&self) -> Option<Exception> {
self.current_exception
}
pub const fn set_receive_timeout(&mut self, timeout: Option<ReceiveTimeout>) {
self.receive_timeout = timeout;
}
#[must_use]
pub const fn receive_timeout(&self) -> Option<ReceiveTimeout> {
self.receive_timeout
}
pub const fn set_receive_timer_ref(&mut self, timer_ref: Option<u64>) {
self.receive_timer_ref = timer_ref;
}
#[must_use]
pub const fn receive_timer_ref(&self) -> Option<u64> {
self.receive_timer_ref
}
#[cfg(feature = "telemetry")]
pub(crate) fn mark_receive_wait_started(&mut self) {
if self.receive_wait_started.is_none() {
self.receive_wait_started = Some(crate::telemetry::spans::receive_wait_started_now());
}
}
#[cfg(feature = "telemetry")]
pub(crate) fn take_receive_wait_duration(&mut self) -> Option<std::time::Duration> {
self.receive_wait_started
.take()
.map(|started| started.elapsed())
}
#[must_use]
pub fn x_reg(&self, n: u16) -> Term {
self.x_regs[usize::from(n)]
}
pub fn set_x_reg(&mut self, n: u16, value: Term) {
self.x_regs[usize::from(n)] = value;
}
pub fn get_float_reg(&self, index: u16) -> Result<f64, ProcessError> {
self.float_regs
.get(usize::from(index))
.copied()
.ok_or(ProcessError::InvalidFloatRegister { index })
}
pub fn set_float_reg(&mut self, index: u16, value: f64) -> Result<(), ProcessError> {
let register = self
.float_regs
.get_mut(usize::from(index))
.ok_or(ProcessError::InvalidFloatRegister { index })?;
*register = value;
Ok(())
}
#[must_use]
pub const fn x_regs(&self) -> &[Term; 1024] {
&self.x_regs
}
pub fn x_regs_mut(&mut self) -> &mut [Term; 1024] {
&mut self.x_regs
}
pub fn set_native_continuation(&mut self, continuation: Option<NativeContinuation>) {
self.native_continuation = continuation;
}
pub fn take_native_continuation(&mut self) -> Option<NativeContinuation> {
self.native_continuation.take()
}
pub(crate) fn push_native_root(&mut self, term: Term) -> usize {
self.native_roots.push(term);
self.native_roots.len() - 1
}
pub(crate) fn native_root(&self, index: usize) -> Option<Term> {
self.native_roots.get(index).copied()
}
pub(crate) fn set_native_root(&mut self, index: usize, term: Term) {
if let Some(slot) = self.native_roots.get_mut(index) {
*slot = term;
}
}
pub(crate) fn native_root_depth(&self) -> usize {
self.native_roots.len()
}
pub(crate) fn truncate_native_roots(&mut self, depth: usize) {
self.native_roots.truncate(depth);
}
#[must_use]
pub fn has_native_continuation(&self) -> bool {
self.native_continuation.is_some()
}
#[must_use]
pub const fn reduction_counter(&self) -> u32 {
self.reduction_counter
}
pub fn decrement_reductions(&mut self, n: u32) {
self.reduction_counter = self.reduction_counter.saturating_sub(n);
}
#[must_use]
pub const fn reductions_exhausted(&self) -> bool {
self.reduction_counter == 0
}
pub const fn reset_reductions(&mut self, budget: u32) {
self.reduction_counter = budget;
}
#[must_use]
pub const fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
pub const fn set_namespace_id(&mut self, namespace_id: NamespaceId) {
self.namespace_id = namespace_id;
}
#[must_use]
pub const fn code_position(&self) -> Option<CodePosition> {
self.code_position
}
pub const fn set_code_position(&mut self, code_position: Option<CodePosition>) {
self.code_position = code_position;
}
#[must_use]
pub fn current_module(&self) -> Option<&Arc<Module>> {
self.current_module.as_ref()
}
#[must_use]
pub fn references_module(&self, module: &Arc<Module>) -> bool {
self.current_module
.as_ref()
.is_some_and(|current| Arc::ptr_eq(current, module))
|| self
.stack
.pinned_modules()
.any(|pinned| Arc::ptr_eq(pinned, module))
}
pub fn set_current_module(&mut self, module: Arc<Module>) {
self.current_module = Some(module);
}
pub fn clear_current_module(&mut self) {
self.current_module = None;
}
#[must_use]
pub const fn current_mfa(&self) -> Option<(Atom, Atom, u8)> {
self.current_mfa
}
pub const fn set_current_mfa(&mut self, current_mfa: Option<(Atom, Atom, u8)>) {
self.current_mfa = current_mfa;
}
#[must_use]
pub const fn jit_runtime_context(&self) -> Option<JitRuntimeContext> {
self.jit_runtime_context
}
pub const fn set_jit_runtime_context(&mut self, context: Option<JitRuntimeContext>) {
self.jit_runtime_context = context;
}
pub const fn set_jit_status(&mut self, status: Option<JitStatus>) {
self.jit_status = status;
}
pub fn take_jit_status(&mut self) -> Option<JitStatus> {
self.jit_status.take()
}
#[must_use]
pub fn links(&self) -> &[u64] {
&self.links
}
pub fn add_link(&mut self, pid: u64) -> bool {
if pid == self.pid || self.links.contains(&pid) {
return false;
}
self.links.push(pid);
true
}
pub fn remove_link(&mut self, pid: u64) -> bool {
let before = self.links.len();
self.links.retain(|linked| *linked != pid);
before != self.links.len()
}
pub fn take_links(&mut self) -> Vec<u64> {
std::mem::take(&mut self.links)
}
#[must_use]
pub fn remote_links(&self) -> &[RemotePid] {
&self.remote_links
}
pub fn add_remote_link(&mut self, pid: RemotePid) -> bool {
if self.remote_links.contains(&pid) {
return false;
}
self.remote_links.push(pid);
true
}
pub fn remove_remote_link(&mut self, pid: RemotePid) -> bool {
let before = self.remote_links.len();
self.remote_links.retain(|linked| *linked != pid);
before != self.remote_links.len()
}
pub fn take_remote_links(&mut self) -> Vec<RemotePid> {
std::mem::take(&mut self.remote_links)
}
#[must_use]
pub const fn monitors(&self) -> &Vec<Monitor> {
&self.monitors
}
pub fn add_monitor(&mut self, monitor: Monitor) {
self.monitors.push(monitor);
}
pub fn remove_monitor(&mut self, reference: u64) -> Option<Monitor> {
let index = self
.monitors
.iter()
.position(|monitor| monitor.reference() == reference)?;
Some(self.monitors.remove(index))
}
#[must_use]
pub const fn trap_exit(&self) -> bool {
self.trap_exit
}
pub const fn set_trap_exit(&mut self, trap_exit: bool) {
self.trap_exit = trap_exit;
}
#[must_use]
pub const fn group_leader(&self) -> Term {
self.group_leader
}
pub const fn set_group_leader(&mut self, group_leader: Term) {
self.group_leader = group_leader;
}
pub fn terminate(&mut self, reason: ExitReason) {
self.close_owned_fd_resources();
self.status = ProcessStatus::Exited(reason);
crate::gc::release_all_refcounted_resources(self);
self.virtual_binary_heap = 0;
self.heap = Heap::new(1);
self.stack = Stack::new();
self.mailbox = Mailbox::new();
self.handlers.clear();
self.current_exception = None;
self.dictionary.clear();
self.receive_timeout = None;
self.receive_timer_ref = None;
self.x_regs = [Term::NIL; 1024];
self.float_regs = [0.0; 16];
self.native_continuation = None;
self.native_roots.clear();
self.reduction_counter = 0;
self.code_position = None;
self.current_module = None;
self.current_mfa = None;
}
fn close_owned_fd_resources(&mut self) {
let owner_pid = self.pid;
self.heap().visit_boxed_objects(|ptr, tag, _words| {
if tag == BoxedTag::FdResource {
crate::io::resource::close_owned_resource_at(ptr, owner_pid);
}
});
}
}
#[cfg(test)]
mod tests;