use std::fmt;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::atom::AtomTable;
use crate::distribution::control::DistributionSendFacility;
use crate::distribution::pg::PgFacility;
use crate::distribution::remote_link::DistributionControlFacility;
use crate::io::resource::FdInner;
use crate::io::{
CompletionRing, IoCompletion, IoError, IoFacility, IoOp, IoSink, NullSink, ResultMode,
};
use crate::native::ets_bifs::EtsFoldlState;
use crate::native::stdlib_stubs::{
lists_hof_bifs::ListsHofState,
maps_bifs::{ContinuationStep, MapsHofState},
};
use crate::process::{Priority, Process};
use crate::replay::ReplayDriver;
use crate::term::Term;
use crate::term::compare;
use crate::timer::{TimerRef, TimerWheel};
use super::distribution_bifs::GlobalNameFacility;
use super::ets_bifs::EtsFacility;
use super::group_leader::GroupLeaderFacility;
use super::io_message::IoMessageFacility;
use super::links::LinkFacility;
use super::process_info_bifs::ProcessInfoFacility;
use super::registry::RegistryFacility;
use super::select::SelectFacility;
use super::spawn::SpawnFacility;
use super::supervision::SupervisionFacility;
use super::system_info_bifs::SystemInfoFacility;
use super::{NativeKey, code_management_bifs::CodeManagementFacility};
#[derive(Clone, Copy, Debug)]
pub struct RootedTerms {
base: usize,
len: usize,
}
#[derive(Clone, Debug)]
pub struct TrampolineRequest {
pub fun: Term,
pub args: Vec<Term>,
pub continuation: Option<NativeContinuation>,
}
#[derive(Clone, Debug)]
pub enum NativeContinuation {
Maps(MapsHofState),
Lists(ListsHofState),
EtsFoldl(EtsFoldlState),
AionTimeout(AionTimeoutContinuation),
}
#[derive(Clone)]
pub struct AionTimeoutContinuation {
pub state_id: u64,
pub resume: fn(
AionTimeoutContinuation,
Term,
&mut ProcessContext<'_>,
) -> Result<ContinuationStep, Term>,
}
impl std::fmt::Debug for AionTimeoutContinuation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AionTimeoutContinuation")
.field("state_id", &self.state_id)
.finish()
}
}
impl NativeContinuation {
pub(crate) fn for_each_term(&self, f: &mut dyn FnMut(Term)) {
match self {
Self::Maps(state) => state.for_each_term(f),
Self::Lists(state) => state.for_each_term(f),
Self::EtsFoldl(state) => state.for_each_term(f),
Self::AionTimeout(_) => {}
}
}
pub(crate) fn for_each_term_mut(&mut self, f: &mut dyn FnMut(&mut Term)) {
match self {
Self::Maps(state) => state.for_each_term_mut(f),
Self::Lists(state) => state.for_each_term_mut(f),
Self::EtsFoldl(state) => state.for_each_term_mut(f),
Self::AionTimeout(_) => {}
}
}
}
#[derive(Clone, Debug)]
pub enum FileIoContinuation {
Open,
Close { fd: Arc<FdInner> },
Read { fd: Option<Arc<FdInner>> },
Write {
fd: Option<Arc<FdInner>>,
expected_len: usize,
},
SeekEof { fd: Arc<FdInner>, offset: i64 },
FileInfo,
ListDir,
MakeDir,
DelFile,
DelDir,
Rename,
Accept,
UdpSend { expected_len: usize },
UdpRecv,
UdpActiveRecv { fd: Arc<FdInner> },
TcpActiveRecv { fd: Arc<FdInner> },
Connect { fd: Arc<FdInner> },
TcpWrite {
fd: Arc<FdInner>,
remaining: Vec<u8>,
bytes_written: usize,
},
TcpRead {
fd: Arc<FdInner>,
requested_len: usize,
accumulated: Vec<u8>,
timeout_ms: Option<u64>,
},
}
pub trait FileIoFacility: Send + Sync {
fn submit_file_io(&self, pid: u64, op: IoOp, continuation: FileIoContinuation) -> u64;
fn track_submitted_file_io(&self, pid: u64, op_id: u64, continuation: FileIoContinuation);
fn take_file_io_completion(&self, pid: u64) -> Option<FileIoCompletion>;
fn cancel_pending_file_io_for_pid(&self, pid: u64);
fn ring(&self) -> &dyn CompletionRing;
}
#[derive(Debug)]
pub struct FileIoCompletion {
pub op_id: u64,
pub continuation: FileIoContinuation,
pub completion: IoCompletion,
}
pub trait TcpIoFacility: Send + Sync {
fn submit_active_tcp_read(&self, socket: Arc<FdInner>, buf_len: usize) -> Option<u64>;
}
pub trait RemoteSpawnFacility: Send + Sync {
fn remote_spawn(
&self,
caller_pid: u64,
node: crate::atom::Atom,
module: crate::atom::Atom,
function: crate::atom::Atom,
args: Vec<Term>,
options: super::spawn::SpawnOptions,
) -> Result<RemoteSpawnResult, RemoteSpawnError>;
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct RemoteSpawnResult {
pub node: crate::atom::Atom,
pub pid_number: u64,
pub serial: u64,
pub monitor_reference: Option<u64>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum RemoteSpawnError {
Unavailable,
Failed,
}
#[derive(Copy, Clone, Debug)]
pub struct SuspendRequest {
pub timeout_ms: Option<u64>,
pub wake_on_message: bool,
pub call_id: Option<u64>,
}
pub trait SuspensionRegistrar: Send + Sync {
fn register_host_await(&self, pid: u64, call_id: u64, wake_on_message: bool);
fn cancel_host_await(&self, pid: u64, call_id: u64);
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ExceptionClass {
Error,
Throw,
Exit,
}
pub trait WasmAsyncNifFacility {
fn start_async_nif(
&self,
mfa: NativeKey,
args: &[Term],
context: &mut ProcessContext<'_>,
) -> Result<Term, Term>;
}
pub struct ProcessContext<'process> {
pid: Option<u64>,
current_native: Option<NativeKey>,
local_node: Option<crate::distribution::Node>,
net_kernel: Option<Arc<crate::distribution::NetKernel>>,
distribution_send: Option<Arc<dyn DistributionSendFacility>>,
process: Option<&'process mut Process>,
detached_allocations: Vec<Box<[u64]>>,
live_x: usize,
timers: Option<Arc<Mutex<TimerWheel>>>,
atom_table: Option<Arc<AtomTable>>,
spawn_facility: Option<Arc<dyn SpawnFacility>>,
remote_spawn_facility: Option<Arc<dyn RemoteSpawnFacility>>,
link_facility: Option<Arc<dyn LinkFacility>>,
distribution_control_facility: Option<Arc<dyn DistributionControlFacility>>,
global_name_facility: Option<Arc<dyn GlobalNameFacility>>,
group_leader_facility: Option<Arc<dyn GroupLeaderFacility>>,
supervision_facility: Option<Arc<dyn SupervisionFacility>>,
code_management_facility: Option<Arc<dyn CodeManagementFacility>>,
process_info_facility: Option<Arc<dyn ProcessInfoFacility>>,
registry_facility: Option<Arc<dyn RegistryFacility>>,
select_facility: Option<Arc<dyn SelectFacility>>,
system_info_facility: Option<Arc<dyn SystemInfoFacility>>,
ets_facility: Option<Arc<dyn EtsFacility>>,
pg_facility: Option<Arc<dyn PgFacility>>,
io_facility: Option<Arc<dyn IoFacility>>,
io_message_facility: Option<Arc<dyn IoMessageFacility>>,
file_io_facility: Option<Arc<dyn FileIoFacility>>,
tcp_io_facility: Option<Arc<dyn TcpIoFacility>>,
io_sink: Arc<dyn IoSink>,
exception_class: ExceptionClass,
exception_stacktrace: Term,
shutdown_requested: bool,
trampoline: Option<TrampolineRequest>,
suspend: Option<SuspendRequest>,
suspension_registrar: Option<Arc<dyn SuspensionRegistrar>>,
replay_driver: Option<Arc<Mutex<ReplayDriver>>>,
wasm_async_nif_facility: Option<Rc<dyn WasmAsyncNifFacility>>,
nif_private_data: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
impl fmt::Debug for ProcessContext<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProcessContext")
.field("pid", &self.pid)
.field("current_native", &self.current_native)
.field("local_node", &self.local_node)
.field("net_kernel", &self.net_kernel.as_ref().map(|_| ".."))
.field(
"distribution_send",
&self.distribution_send.as_ref().map(|_| ".."),
)
.field("process_heap", &self.process.as_ref().map(|_| ".."))
.field("live_x", &self.live_x)
.field("timers", &self.timers)
.field("atom_table", &self.atom_table.as_ref().map(|_| ".."))
.field(
"spawn_facility",
&self.spawn_facility.as_ref().map(|_| ".."),
)
.field(
"remote_spawn_facility",
&self.remote_spawn_facility.as_ref().map(|_| ".."),
)
.field("link_facility", &self.link_facility.as_ref().map(|_| ".."))
.field(
"distribution_control_facility",
&self.distribution_control_facility.as_ref().map(|_| ".."),
)
.field(
"global_name_facility",
&self.global_name_facility.as_ref().map(|_| ".."),
)
.field(
"group_leader_facility",
&self.group_leader_facility.as_ref().map(|_| ".."),
)
.field(
"supervision_facility",
&self.supervision_facility.as_ref().map(|_| ".."),
)
.field(
"code_management_facility",
&self.code_management_facility.as_ref().map(|_| ".."),
)
.field(
"process_info_facility",
&self.process_info_facility.as_ref().map(|_| ".."),
)
.field(
"registry_facility",
&self.registry_facility.as_ref().map(|_| ".."),
)
.field(
"select_facility",
&self.select_facility.as_ref().map(|_| ".."),
)
.field(
"system_info_facility",
&self.system_info_facility.as_ref().map(|_| ".."),
)
.field("ets_facility", &self.ets_facility.as_ref().map(|_| ".."))
.field("pg_facility", &self.pg_facility.as_ref().map(|_| ".."))
.field("io_facility", &self.io_facility.as_ref().map(|_| ".."))
.field(
"io_message_facility",
&self.io_message_facility.as_ref().map(|_| ".."),
)
.field(
"file_io_facility",
&self.file_io_facility.as_ref().map(|_| ".."),
)
.field(
"tcp_io_facility",
&self.tcp_io_facility.as_ref().map(|_| ".."),
)
.field("io_sink", &"..")
.field("exception_class", &self.exception_class)
.field("shutdown_requested", &self.shutdown_requested)
.field("trampoline", &self.trampoline)
.field("suspend", &self.suspend)
.field("exception_stacktrace", &self.exception_stacktrace)
.field("replay_driver", &self.replay_driver.as_ref().map(|_| ".."))
.field(
"wasm_async_nif_facility",
&self.wasm_async_nif_facility.as_ref().map(|_| ".."),
)
.field(
"nif_private_data",
&self.nif_private_data.as_ref().map(|_| ".."),
)
.finish()
}
}
impl Default for ProcessContext<'_> {
fn default() -> Self {
Self::new()
}
}
impl<'process> ProcessContext<'process> {
#[must_use]
pub fn new() -> Self {
Self {
pid: None,
current_native: None,
local_node: None,
net_kernel: None,
distribution_send: None,
process: None,
detached_allocations: Vec::new(),
live_x: 256,
timers: None,
atom_table: None,
spawn_facility: None,
remote_spawn_facility: None,
link_facility: None,
distribution_control_facility: None,
global_name_facility: None,
group_leader_facility: None,
supervision_facility: None,
code_management_facility: None,
process_info_facility: None,
registry_facility: None,
select_facility: None,
system_info_facility: None,
ets_facility: None,
pg_facility: None,
io_facility: None,
io_message_facility: None,
file_io_facility: None,
tcp_io_facility: None,
io_sink: Arc::new(NullSink),
exception_class: ExceptionClass::Error,
exception_stacktrace: Term::NIL,
trampoline: None,
suspend: None,
suspension_registrar: None,
shutdown_requested: false,
replay_driver: None,
wasm_async_nif_facility: None,
nif_private_data: None,
}
}
#[must_use]
pub fn with_timer_services(pid: u64, timers: Arc<Mutex<TimerWheel>>) -> Self {
Self {
pid: Some(pid),
current_native: None,
local_node: None,
net_kernel: None,
distribution_send: None,
process: None,
detached_allocations: Vec::new(),
live_x: 256,
timers: Some(timers),
atom_table: None,
spawn_facility: None,
remote_spawn_facility: None,
link_facility: None,
distribution_control_facility: None,
global_name_facility: None,
group_leader_facility: None,
supervision_facility: None,
code_management_facility: None,
process_info_facility: None,
registry_facility: None,
select_facility: None,
system_info_facility: None,
ets_facility: None,
pg_facility: None,
io_facility: None,
io_message_facility: None,
file_io_facility: None,
tcp_io_facility: None,
io_sink: Arc::new(NullSink),
exception_class: ExceptionClass::Error,
exception_stacktrace: Term::NIL,
trampoline: None,
suspend: None,
suspension_registrar: None,
shutdown_requested: false,
replay_driver: None,
wasm_async_nif_facility: None,
nif_private_data: None,
}
}
#[must_use]
pub fn nif_private_data(&self) -> Option<&Arc<dyn std::any::Any + Send + Sync>> {
self.nif_private_data.as_ref()
}
pub fn set_nif_private_data(&mut self, data: Option<Arc<dyn std::any::Any + Send + Sync>>) {
self.nif_private_data = data;
}
#[must_use]
pub fn replay_driver(&self) -> Option<&Arc<Mutex<ReplayDriver>>> {
self.replay_driver.as_ref()
}
pub fn set_replay_driver(&mut self, driver: Option<Arc<Mutex<ReplayDriver>>>) {
self.replay_driver = driver;
}
#[must_use]
pub fn pid(&self) -> Option<u64> {
self.pid
}
#[must_use]
pub fn current_native(&self) -> Option<NativeKey> {
self.current_native
}
pub fn set_current_native(&mut self, native: Option<NativeKey>) {
self.current_native = native;
}
#[must_use]
pub fn wasm_async_nif_facility(&self) -> Option<Rc<dyn WasmAsyncNifFacility>> {
self.wasm_async_nif_facility.clone()
}
pub fn set_wasm_async_nif_facility(&mut self, facility: Option<Rc<dyn WasmAsyncNifFacility>>) {
self.wasm_async_nif_facility = facility;
}
#[must_use]
pub fn local_node(&self) -> Option<crate::distribution::Node> {
self.local_node
}
pub fn set_local_node(&mut self, node: Option<crate::distribution::Node>) {
self.local_node = node;
}
#[must_use]
pub fn net_kernel(&self) -> Option<&crate::distribution::NetKernel> {
self.net_kernel.as_deref()
}
pub fn set_net_kernel(&mut self, net_kernel: Option<Arc<crate::distribution::NetKernel>>) {
self.net_kernel = net_kernel;
}
#[must_use]
pub fn distribution_send_facility(&self) -> Option<&dyn DistributionSendFacility> {
self.distribution_send.as_deref()
}
pub fn set_distribution_send_facility(
&mut self,
facility: Option<Arc<dyn DistributionSendFacility>>,
) {
self.distribution_send = facility;
}
#[must_use]
pub fn receive_timeout_expired(&self) -> bool {
self.process
.as_ref()
.is_some_and(|process| process.receive_timeout().is_some())
}
pub fn clear_receive_timeout(&mut self) {
if let Some(process) = self.process.as_deref_mut() {
process.set_receive_timeout(None);
process.set_receive_timer_ref(None);
}
}
pub fn cancel_pending_file_io_for_current_process(&self) {
if let (Some(pid), Some(facility)) = (self.pid, self.file_io_facility.as_ref()) {
facility.cancel_pending_file_io_for_pid(pid);
}
}
pub fn set_pid(&mut self, pid: Option<u64>) {
self.pid = pid;
}
pub fn attach_process(&mut self, process: &'process mut Process, live_x: usize) {
self.pid = Some(process.pid());
self.process = Some(process);
self.live_x = live_x;
}
pub fn detach_process(&mut self) {
self.process = None;
}
#[must_use]
pub fn process_heap(&self) -> Option<&crate::process::heap::Heap> {
self.process.as_ref().map(|process| process.heap())
}
pub fn process_mut(&mut self) -> Option<&mut Process> {
self.process.as_deref_mut()
}
pub fn send_to_attached_self(&mut self, target: u64, message: Term) -> bool {
let Some(process) = self.process.as_deref_mut() else {
return false;
};
if process.pid() != target {
return false;
}
process.mailbox_mut().push_owned(message);
#[cfg(feature = "telemetry")]
crate::telemetry::metrics::record_message_sent();
true
}
pub fn ensure_heap_space(&mut self, words: usize) -> Result<(), Term> {
let Some(process) = self.process.as_deref_mut() else {
let _ = words;
return Ok(());
};
crate::gc::ensure_space(process, words, self.live_x)
.map_err(|_| Term::atom(crate::atom::Atom::BADARG))
}
pub fn with_rooted<R>(
&mut self,
terms: &[Term],
body: impl FnOnce(&mut Self, &mut RootedTerms) -> Result<R, Term>,
) -> Result<R, Term> {
let depth = {
let process = self
.process
.as_deref_mut()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
let depth = process.native_root_depth();
for term in terms {
process.push_native_root(*term);
}
depth
};
let mut handle = RootedTerms {
base: depth,
len: terms.len(),
};
let result = body(self, &mut handle);
if let Some(process) = self.process.as_deref_mut() {
process.truncate_native_roots(depth);
}
result
}
pub fn rooted_push(&mut self, handle: &mut RootedTerms, term: Term) -> Result<(), Term> {
let process = self
.process
.as_deref_mut()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
if handle.base + handle.len != process.native_root_depth() {
return Err(Term::atom(crate::atom::Atom::BADARG));
}
process.push_native_root(term);
handle.len += 1;
Ok(())
}
#[must_use]
pub fn rooted_len(&self, handle: &RootedTerms) -> usize {
handle.len
}
pub fn rooted(&self, handle: &RootedTerms, index: usize) -> Result<Term, Term> {
let process = self
.process
.as_deref()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
if index >= handle.len {
return Err(Term::atom(crate::atom::Atom::BADARG));
}
process
.native_root(handle.base + index)
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))
}
pub fn set_rooted(
&mut self,
handle: &RootedTerms,
index: usize,
term: Term,
) -> Result<(), Term> {
let process = self
.process
.as_deref_mut()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
if index >= handle.len {
return Err(Term::atom(crate::atom::Atom::BADARG));
}
process.set_native_root(handle.base + index, term);
Ok(())
}
#[must_use]
pub fn spawn_facility(&self) -> Option<&dyn SpawnFacility> {
self.spawn_facility.as_deref()
}
pub fn set_spawn_facility(&mut self, facility: Option<Arc<dyn SpawnFacility>>) {
self.spawn_facility = facility;
}
#[must_use]
pub fn remote_spawn_facility(&self) -> Option<&dyn RemoteSpawnFacility> {
self.remote_spawn_facility.as_deref()
}
pub fn set_remote_spawn_facility(&mut self, facility: Option<Arc<dyn RemoteSpawnFacility>>) {
self.remote_spawn_facility = facility;
}
#[must_use]
pub fn link_facility(&self) -> Option<&dyn LinkFacility> {
self.link_facility.as_deref()
}
pub fn set_link_facility(&mut self, facility: Option<Arc<dyn LinkFacility>>) {
self.link_facility = facility;
}
#[must_use]
pub fn distribution_control_facility(&self) -> Option<&dyn DistributionControlFacility> {
self.distribution_control_facility.as_deref()
}
pub fn set_distribution_control_facility(
&mut self,
facility: Option<Arc<dyn DistributionControlFacility>>,
) {
self.distribution_control_facility = facility;
}
#[must_use]
pub fn global_name_facility(&self) -> Option<&dyn GlobalNameFacility> {
self.global_name_facility.as_deref()
}
pub fn set_global_name_facility(&mut self, facility: Option<Arc<dyn GlobalNameFacility>>) {
self.global_name_facility = facility;
}
#[must_use]
pub fn group_leader_facility(&self) -> Option<&dyn GroupLeaderFacility> {
self.group_leader_facility.as_deref()
}
pub fn set_group_leader_facility(&mut self, facility: Option<Arc<dyn GroupLeaderFacility>>) {
self.group_leader_facility = facility;
}
#[must_use]
pub fn supervision_facility(&self) -> Option<&dyn SupervisionFacility> {
self.supervision_facility.as_deref()
}
pub fn set_supervision_facility(&mut self, facility: Option<Arc<dyn SupervisionFacility>>) {
self.supervision_facility = facility;
}
#[must_use]
pub fn code_management_facility(&self) -> Option<&dyn CodeManagementFacility> {
self.code_management_facility.as_deref()
}
pub fn set_code_management_facility(
&mut self,
facility: Option<Arc<dyn CodeManagementFacility>>,
) {
self.code_management_facility = facility;
}
#[must_use]
pub fn atom_table(&self) -> Option<&AtomTable> {
self.atom_table.as_deref()
}
#[must_use]
pub fn atom_table_arc(&self) -> Option<Arc<AtomTable>> {
self.atom_table.clone()
}
pub fn set_atom_table(&mut self, table: Option<Arc<AtomTable>>) {
self.atom_table = table;
}
#[must_use]
pub fn process_info_facility(&self) -> Option<&dyn ProcessInfoFacility> {
self.process_info_facility.as_deref()
}
pub fn set_process_info_facility(&mut self, facility: Option<Arc<dyn ProcessInfoFacility>>) {
self.process_info_facility = facility;
}
#[must_use]
pub fn registry_facility(&self) -> Option<&dyn RegistryFacility> {
self.registry_facility.as_deref()
}
pub fn set_registry_facility(&mut self, facility: Option<Arc<dyn RegistryFacility>>) {
self.registry_facility = facility;
}
pub fn schedule_timer(
&mut self,
delay: Duration,
target_pid: u64,
message: Term,
) -> Option<TimerRef> {
let timers = self.timers.as_ref()?;
Some(
timers
.lock()
.unwrap_or_else(|error| error.into_inner())
.schedule(delay, target_pid, message),
)
}
pub fn schedule_timer_with_reference<F>(
&mut self,
delay: Duration,
target_pid: u64,
message: F,
) -> Option<TimerRef>
where
F: FnOnce(TimerRef) -> Term,
{
let timers = self.timers.as_ref()?;
let mut timers = timers.lock().unwrap_or_else(|error| error.into_inner());
let reference = timers.reserve_reference();
timers.schedule_reserved(reference, delay, target_pid, message(reference))
}
pub fn reserve_timer_reference(&mut self) -> Option<TimerRef> {
let timers = self.timers.as_ref()?;
Some(
timers
.lock()
.unwrap_or_else(|error| error.into_inner())
.reserve_reference(),
)
}
pub fn schedule_reserved_timer(
&mut self,
reference: TimerRef,
delay: Duration,
target_pid: u64,
message: Term,
) -> Option<TimerRef> {
let timers = self.timers.as_ref()?;
timers
.lock()
.unwrap_or_else(|error| error.into_inner())
.schedule_reserved(reference, delay, target_pid, message)
}
pub fn cancel_timer(&mut self, reference: TimerRef) -> Option<Duration> {
let timers = self.timers.as_ref()?;
timers
.lock()
.unwrap_or_else(|error| error.into_inner())
.cancel(reference)
}
pub const fn allocate_term(&mut self, term: Term) -> Term {
term
}
#[must_use]
pub fn select_facility(&self) -> Option<&dyn SelectFacility> {
self.select_facility.as_deref()
}
pub fn set_select_facility(&mut self, facility: Option<Arc<dyn SelectFacility>>) {
self.select_facility = facility;
}
#[must_use]
pub fn system_info_facility(&self) -> Option<&dyn SystemInfoFacility> {
self.system_info_facility.as_deref()
}
pub fn set_system_info_facility(&mut self, facility: Option<Arc<dyn SystemInfoFacility>>) {
self.system_info_facility = facility;
}
#[must_use]
pub fn ets_facility(&self) -> Option<&dyn EtsFacility> {
self.ets_facility.as_deref()
}
pub fn set_ets_facility(&mut self, facility: Option<Arc<dyn EtsFacility>>) {
self.ets_facility = facility;
}
#[must_use]
pub fn pg_facility(&self) -> Option<&dyn PgFacility> {
self.pg_facility.as_deref()
}
pub fn set_pg_facility(&mut self, facility: Option<Arc<dyn PgFacility>>) {
self.pg_facility = facility;
}
#[must_use]
pub fn io_facility(&self) -> Option<&dyn IoFacility> {
self.io_facility.as_deref()
}
pub fn set_io_facility(&mut self, facility: Option<Arc<dyn IoFacility>>) {
self.io_facility = facility;
}
#[must_use]
pub fn io_message_facility(&self) -> Option<&dyn IoMessageFacility> {
self.io_message_facility.as_deref()
}
pub fn set_io_message_facility(&mut self, facility: Option<Arc<dyn IoMessageFacility>>) {
self.io_message_facility = facility;
}
pub fn submit_io_and_suspend(&mut self, op: IoOp, mode: ResultMode) -> Result<(), IoError> {
let pid = self.pid.ok_or(IoError::MissingPid)?;
if self.io_facility.is_none() {
return Err(IoError::Unavailable);
}
let _call_id = self.request_await_suspend(None);
let Some(facility) = self.io_facility.as_ref() else {
return Err(IoError::Unavailable);
};
if let Err(error) = facility.submit_and_suspend_for_pid(pid, op, mode) {
self.cancel_requested_suspend();
return Err(error);
}
Ok(())
}
#[must_use]
pub fn file_io_facility(&self) -> Option<&dyn FileIoFacility> {
self.file_io_facility.as_deref()
}
pub fn set_file_io_facility(&mut self, facility: Option<Arc<dyn FileIoFacility>>) {
self.file_io_facility = facility;
}
#[must_use]
pub fn tcp_io_facility(&self) -> Option<&dyn TcpIoFacility> {
self.tcp_io_facility.as_deref()
}
pub fn set_tcp_io_facility(&mut self, facility: Option<Arc<dyn TcpIoFacility>>) {
self.tcp_io_facility = facility;
}
pub fn submit_file_io(
&mut self,
op: IoOp,
continuation: FileIoContinuation,
) -> Result<u64, Term> {
self.submit_file_io_with_timeout(op, continuation, None)
}
pub fn submit_file_io_with_timeout(
&mut self,
op: IoOp,
continuation: FileIoContinuation,
timeout_ms: Option<u64>,
) -> Result<u64, Term> {
let pid = self
.pid
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
if self.file_io_facility.is_none() {
return Err(Term::atom(crate::atom::Atom::BADARG));
}
let _call_id = self.request_await_suspend(timeout_ms);
let facility = self
.file_io_facility
.as_ref()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
let op_id = facility.submit_file_io(pid, op, continuation);
Ok(op_id)
}
pub fn track_submitted_file_io(
&mut self,
op_id: u64,
continuation: FileIoContinuation,
) -> Result<(), Term> {
let pid = self
.pid
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
let facility = self
.file_io_facility
.as_ref()
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))?;
facility.track_submitted_file_io(pid, op_id, continuation);
Ok(())
}
pub fn take_file_io_completion(&self) -> Option<FileIoCompletion> {
let pid = self.pid?;
self.file_io_facility.as_ref()?.take_file_io_completion(pid)
}
#[must_use]
pub fn file_completion_ring(&self) -> Option<&dyn CompletionRing> {
self.file_io_facility
.as_ref()
.map(|facility| facility.ring())
}
pub fn dict_put(&mut self, key: Term, value: Term) -> Result<Term, Term> {
let Some(process) = self.process.as_deref_mut() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_put(key, value))
}
pub fn group_leader(&self) -> Result<Term, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.group_leader())
}
pub fn priority(&self) -> Result<Priority, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.priority())
}
pub fn set_priority(&mut self, priority: Priority) -> Result<Priority, Term> {
let Some(process) = self.process.as_deref_mut() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
let old_priority = process.priority();
process.set_priority(priority);
Ok(old_priority)
}
pub fn set_attached_group_leader(&mut self, pid: u64, group_leader: Term) -> bool {
let Some(process) = self.process.as_deref_mut() else {
return false;
};
if process.pid() != pid {
return false;
}
process.set_group_leader(group_leader);
true
}
pub fn dict_get(&self, key: Term) -> Result<Term, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_get(key))
}
pub fn dict_get_all(&self) -> Result<Vec<(Term, Term)>, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_get_all().to_vec())
}
pub fn dict_len(&self) -> Result<usize, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_get_all().len())
}
pub fn dict_erase(&mut self, key: Term) -> Result<Term, Term> {
let Some(process) = self.process.as_deref_mut() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_erase(key))
}
pub fn dict_erase_all(&mut self) -> Result<Vec<(Term, Term)>, Term> {
let Some(process) = self.process.as_deref_mut() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_erase_all())
}
pub fn dict_get_keys(&self, value: Term) -> Result<Vec<Term>, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process.dict_get_keys(value))
}
pub fn dict_count_keys_for_value(&self, value: Term) -> Result<usize, Term> {
let Some(process) = self.process.as_ref() else {
return Err(Term::atom(crate::atom::Atom::BADARG));
};
Ok(process
.dict_get_all()
.iter()
.filter(|(_, existing_value)| compare::exact_eq(*existing_value, value))
.count())
}
#[must_use]
pub fn io_sink(&self) -> &dyn IoSink {
self.io_sink.as_ref()
}
pub fn set_io_sink(&mut self, sink: Arc<dyn IoSink>) {
self.io_sink = sink;
}
pub fn request_shutdown(&mut self) {
self.shutdown_requested = true;
}
pub fn take_shutdown_request(&mut self) -> bool {
let requested = self.shutdown_requested;
self.shutdown_requested = false;
requested
}
pub fn set_exception_class(&mut self, class: ExceptionClass) {
self.exception_class = class;
}
pub fn take_exception_class(&mut self) -> ExceptionClass {
let class = self.exception_class;
self.exception_class = ExceptionClass::Error;
class
}
pub fn set_trampoline(&mut self, fun: Term, args: Vec<Term>) {
self.trampoline = Some(TrampolineRequest {
fun,
args,
continuation: None,
});
}
pub fn set_continuation_trampoline(
&mut self,
fun: Term,
args: Vec<Term>,
continuation: NativeContinuation,
) {
self.trampoline = Some(TrampolineRequest {
fun,
args,
continuation: Some(continuation),
});
}
pub fn take_trampoline(&mut self) -> Option<TrampolineRequest> {
self.trampoline.take()
}
#[must_use]
pub fn has_trampoline(&self) -> bool {
self.trampoline.is_some()
}
pub fn request_suspend(&mut self, timeout_ms: Option<u64>) -> Option<u64> {
self.request_suspend_flavor(timeout_ms, true)
}
pub fn request_await_suspend(&mut self, timeout_ms: Option<u64>) -> Option<u64> {
self.request_suspend_flavor(timeout_ms, false)
}
fn request_suspend_flavor(
&mut self,
timeout_ms: Option<u64>,
wake_on_message: bool,
) -> Option<u64> {
let call_id = self
.process
.as_deref_mut()
.map(Process::allocate_suspension_call_id);
self.suspend = Some(SuspendRequest {
timeout_ms,
wake_on_message,
call_id,
});
if let (Some(pid), Some(call_id), Some(registrar)) =
(self.pid, call_id, self.suspension_registrar.as_ref())
{
registrar.register_host_await(pid, call_id, wake_on_message);
}
call_id
}
pub fn cancel_requested_suspend(&mut self) {
let Some(request) = self.suspend.take() else {
return;
};
self.cancel_suspend_request(&request);
}
pub fn cancel_suspend_request(&self, request: &SuspendRequest) {
if let (Some(pid), Some(call_id), Some(registrar)) = (
self.pid,
request.call_id,
self.suspension_registrar.as_ref(),
) {
registrar.cancel_host_await(pid, call_id);
}
}
pub fn take_suspend(&mut self) -> Option<SuspendRequest> {
self.suspend.take()
}
pub fn set_suspension_registrar(&mut self, registrar: Option<Arc<dyn SuspensionRegistrar>>) {
self.suspension_registrar = registrar;
}
pub fn set_exception_stacktrace(&mut self, trace: Term) {
self.exception_stacktrace = trace;
}
pub fn take_exception_stacktrace(&mut self) -> Term {
let stacktrace = self.exception_stacktrace;
self.exception_stacktrace = Term::NIL;
stacktrace
}
fn alloc_words(&mut self, words: usize) -> Result<&mut [u64], Term> {
self.ensure_heap_space(words)?;
self.alloc_words_prereserved(words)
}
pub fn take_detached_result(&mut self, root: Term) -> Option<crate::ets::OwnedTerm> {
if self.detached_allocations.is_empty() {
None
} else {
Some(crate::ets::OwnedTerm::from_allocations(
root,
std::mem::take(&mut self.detached_allocations),
))
}
}
fn alloc_words_prereserved(&mut self, words: usize) -> Result<&mut [u64], Term> {
if let Some(process) = self.process.as_deref_mut() {
return process
.heap_mut()
.alloc_slice(words)
.map_err(|_| Term::atom(crate::atom::Atom::BADARG));
}
self.detached_allocations
.push(vec![0; words].into_boxed_slice());
self.detached_allocations
.last_mut()
.map(|words| words.as_mut())
.ok_or_else(|| Term::atom(crate::atom::Atom::BADARG))
}
}
mod alloc;
#[cfg(test)]
mod tests;