use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use crate::atom::{Atom, AtomTable};
use crate::error::ExecError;
use crate::ets::copy::OwnedTerm;
use crate::interpreter::{ExecutionResult, NativeServices, run_with_native_services};
use crate::module::ModuleRegistry;
use crate::namespace::NamespaceId;
use crate::native::{BifRegistryImpl, CapabilitySet};
use crate::process::heap::DEFAULT_HEAP_SIZE;
use crate::process::{CodePosition, ExitReason, Priority, Process, ProcessStatus};
use crate::term::Term;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct WasmRunSummary {
pub executed: usize,
pub yielded: Vec<u64>,
pub waiting: Vec<u64>,
pub exited: Vec<u64>,
pub errored: Vec<u64>,
}
pub struct WasmScheduler {
atom_table: Arc<AtomTable>,
module_registry: Arc<ModuleRegistry>,
bif_registry: Arc<BifRegistryImpl>,
next_pid: u64,
processes: BTreeMap<u64, Process>,
ready: ReadyQueues,
waiting: BTreeSet<u64>,
exit_reasons: BTreeMap<u64, ExitReason>,
exit_results: BTreeMap<u64, OwnedTerm>,
exit_errors: BTreeMap<u64, ExecError>,
}
impl WasmScheduler {
#[must_use]
pub fn new(
atom_table: Arc<AtomTable>,
module_registry: Arc<ModuleRegistry>,
bif_registry: Arc<BifRegistryImpl>,
) -> Self {
Self {
atom_table,
module_registry,
bif_registry,
next_pid: 1,
processes: BTreeMap::new(),
ready: ReadyQueues::default(),
waiting: BTreeSet::new(),
exit_reasons: BTreeMap::new(),
exit_results: BTreeMap::new(),
exit_errors: BTreeMap::new(),
}
}
#[must_use]
pub fn atom_table(&self) -> &Arc<AtomTable> {
&self.atom_table
}
#[must_use]
pub fn module_registry(&self) -> &Arc<ModuleRegistry> {
&self.module_registry
}
#[must_use]
pub fn bif_registry(&self) -> &Arc<BifRegistryImpl> {
&self.bif_registry
}
pub fn spawn(
&mut self,
entry_module: Atom,
entry_function: Atom,
args: Vec<Term>,
) -> Result<u64, ExecError> {
self.spawn_in(NamespaceId::DEFAULT, entry_module, entry_function, args)
}
pub fn spawn_in(
&mut self,
namespace: NamespaceId,
entry_module: Atom,
entry_function: Atom,
args: Vec<Term>,
) -> Result<u64, ExecError> {
if namespace != NamespaceId::DEFAULT {
return Err(ExecError::Badarg);
}
let arity = u8::try_from(args.len()).map_err(|_| ExecError::Badarg)?;
let entry = self
.module_registry
.lookup_mfa(entry_module, entry_function, arity)?;
let instruction_pointer = entry.module.label_ip(entry.label)?;
let pid = self.next_pid;
self.next_pid = self.next_pid.saturating_add(1);
let mut process = Process::with_capabilities(pid, DEFAULT_HEAP_SIZE, CapabilitySet::all());
process.set_group_leader(Term::pid(pid));
process.set_priority(Priority::Normal);
process.set_namespace_id(namespace);
process.set_code_position(Some(CodePosition {
module: entry_module,
instruction_pointer,
}));
process.set_current_module(entry.module);
for (index, arg) in args.into_iter().enumerate().take(1024) {
if let Ok(register) = u16::try_from(index) {
process.set_x_reg(register, arg);
}
}
self.ready.push(pid, process.priority());
self.processes.insert(pid, process);
Ok(pid)
}
pub fn wake(&mut self, pid: u64) -> bool {
if !self.waiting.remove(&pid) {
return false;
}
let Some(process) = self.processes.get_mut(&pid) else {
return false;
};
if process.transition_to(ProcessStatus::Running).is_err() {
return false;
}
self.ready.push(pid, process.priority());
true
}
pub fn send(&mut self, pid: u64, message: Term) -> bool {
let Some(process) = self.processes.get_mut(&pid) else {
return false;
};
process.mailbox_mut().push_owned(message);
if self.waiting.contains(&pid) {
return self.wake(pid);
}
true
}
pub fn run_until_idle(&mut self) -> WasmRunSummary {
let mut summary = WasmRunSummary::default();
let budget = self.ready.len();
let mut yielded_next_tick = Vec::new();
for _ in 0..budget {
let Some(pid) = self.ready.pop() else {
break;
};
if self.waiting.contains(&pid) {
continue;
}
let Some(mut process) = self.processes.remove(&pid) else {
continue;
};
let priority = process.priority();
if !matches!(process.status(), ProcessStatus::Running) {
let _transition = process.transition_to(ProcessStatus::Running);
}
process.reset_reductions(crate::scheduler::DEFAULT_REDUCTION_BUDGET);
let Some(module) = process.current_module().cloned() else {
self.exit_errors
.insert(pid, ExecError::InvalidOperand("current module"));
summary.errored.push(pid);
continue;
};
let services = self.native_services();
let result = run_with_native_services(
&mut process,
module.as_ref(),
self.module_registry.as_ref(),
&services,
);
summary.executed += 1;
match result {
Ok(ExecutionResult::Yielded) => {
let _transition = process.transition_to(ProcessStatus::Yielded);
self.processes.insert(pid, process);
yielded_next_tick.push((pid, priority));
summary.yielded.push(pid);
}
Ok(ExecutionResult::Waiting) => {
let _transition = process.transition_to(ProcessStatus::Waiting);
self.processes.insert(pid, process);
self.waiting.insert(pid);
summary.waiting.push(pid);
}
Ok(ExecutionResult::Exited(reason)) => {
let x0 = process.x_reg(0);
let _transition = process.transition_to(ProcessStatus::Exited(reason));
self.exit_reasons.insert(pid, reason);
self.exit_results
.insert(pid, super::exit_capture::capture_term(x0));
summary.exited.push(pid);
}
Ok(ExecutionResult::DirtyCall { .. }) => {
self.exit_errors.insert(
pid,
ExecError::UnsupportedOpcode {
name: "dirty native call on wasm",
},
);
summary.errored.push(pid);
}
Err(error) => {
self.exit_errors.insert(pid, error);
summary.errored.push(pid);
}
}
}
for (pid, priority) in yielded_next_tick {
self.ready.push(pid, priority);
}
summary
}
#[must_use]
pub fn take_exit_result(&mut self, pid: u64) -> Option<OwnedTerm> {
self.exit_results.remove(&pid)
}
#[must_use]
pub fn exit_results(&self) -> Vec<(u64, Term)> {
self.exit_results
.iter()
.map(|(pid, owned)| (*pid, owned.root()))
.collect()
}
fn native_services(&self) -> NativeServices {
NativeServices {
atom_table: Some(Arc::clone(&self.atom_table)),
..NativeServices::default()
}
}
}
#[derive(Default)]
struct ReadyQueues {
max: VecDeque<u64>,
high: VecDeque<u64>,
normal: VecDeque<u64>,
low: VecDeque<u64>,
}
impl ReadyQueues {
fn push(&mut self, pid: u64, priority: Priority) {
match priority {
Priority::Max => self.max.push_back(pid),
Priority::High => self.high.push_back(pid),
Priority::Normal => self.normal.push_back(pid),
Priority::Low => self.low.push_back(pid),
}
}
fn pop(&mut self) -> Option<u64> {
self.max
.pop_front()
.or_else(|| self.high.pop_front())
.or_else(|| self.normal.pop_front())
.or_else(|| self.low.pop_front())
}
fn len(&self) -> usize {
self.max.len() + self.high.len() + self.normal.len() + self.low.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn wasm_scheduler_starts_empty_and_runs_idle_round() {
let atom_table = Arc::new(AtomTable::with_common_atoms());
let modules = Arc::new(ModuleRegistry::new());
let bifs = Arc::new(BifRegistryImpl::new());
let mut scheduler = WasmScheduler::new(atom_table, modules, bifs);
let summary = scheduler.run_until_idle();
assert_eq!(summary.executed, 0);
assert!(summary.exited.is_empty());
}
}