beamr 0.4.3

A Rust runtime with the BEAM's execution model, targeting Gleam
Documentation
//! Cooperative single-threaded scheduler for `wasm32-unknown-unknown` hosts.
//!
//! The host owns the event loop and repeatedly calls [`WasmScheduler::run_until_idle`]
//! from `requestAnimationFrame`, a microtask, or an equivalent callback. No OS
//! threads, blocking I/O, dirty pools, or distribution services are started here.

use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;

use crate::atom::{Atom, AtomTable};
use crate::error::ExecError;
use crate::ets::copy::OwnedTerm;
use crate::interpreter::{ExecutionResult, NativeServices, run_with_native_services};
use crate::module::ModuleRegistry;
use crate::namespace::NamespaceId;
use crate::native::{BifRegistryImpl, CapabilitySet};
use crate::process::heap::DEFAULT_HEAP_SIZE;
use crate::process::{CodePosition, ExitReason, Priority, Process, ProcessStatus};
use crate::term::Term;

/// Summary returned from one cooperative scheduler turn.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct WasmRunSummary {
    /// Number of processes that received one reduction-bounded slice.
    pub executed: usize,
    /// PIDs that yielded and were requeued for a later host tick.
    pub yielded: Vec<u64>,
    /// PIDs that blocked waiting for a message or explicit wake.
    pub waiting: Vec<u64>,
    /// PIDs that exited during this turn.
    pub exited: Vec<u64>,
    /// PIDs that faulted with an interpreter error during this turn.
    pub errored: Vec<u64>,
}

/// Single-threaded cooperative scheduler for WASM.
pub struct WasmScheduler {
    atom_table: Arc<AtomTable>,
    module_registry: Arc<ModuleRegistry>,
    bif_registry: Arc<BifRegistryImpl>,
    next_pid: u64,
    processes: BTreeMap<u64, Process>,
    ready: ReadyQueues,
    waiting: BTreeSet<u64>,
    exit_reasons: BTreeMap<u64, ExitReason>,
    exit_results: BTreeMap<u64, OwnedTerm>,
    exit_errors: BTreeMap<u64, ExecError>,
}

impl WasmScheduler {
    /// Create a scheduler around the VM-global registries used by module loading
    /// and native import resolution.
    #[must_use]
    pub fn new(
        atom_table: Arc<AtomTable>,
        module_registry: Arc<ModuleRegistry>,
        bif_registry: Arc<BifRegistryImpl>,
    ) -> Self {
        Self {
            atom_table,
            module_registry,
            bif_registry,
            next_pid: 1,
            processes: BTreeMap::new(),
            ready: ReadyQueues::default(),
            waiting: BTreeSet::new(),
            exit_reasons: BTreeMap::new(),
            exit_results: BTreeMap::new(),
            exit_errors: BTreeMap::new(),
        }
    }

    /// Access the atom table backing this scheduler.
    #[must_use]
    pub fn atom_table(&self) -> &Arc<AtomTable> {
        &self.atom_table
    }

    /// Access the module registry backing this scheduler.
    #[must_use]
    pub fn module_registry(&self) -> &Arc<ModuleRegistry> {
        &self.module_registry
    }

    /// Access the BIF registry backing this scheduler.
    #[must_use]
    pub fn bif_registry(&self) -> &Arc<BifRegistryImpl> {
        &self.bif_registry
    }

    /// Spawn a process at an exported module/function/arity entrypoint.
    pub fn spawn(
        &mut self,
        entry_module: Atom,
        entry_function: Atom,
        args: Vec<Term>,
    ) -> Result<u64, ExecError> {
        self.spawn_in(NamespaceId::DEFAULT, entry_module, entry_function, args)
    }

    /// Spawn a process in a namespace. WASM is single-node and currently only
    /// supports the default namespace.
    pub fn spawn_in(
        &mut self,
        namespace: NamespaceId,
        entry_module: Atom,
        entry_function: Atom,
        args: Vec<Term>,
    ) -> Result<u64, ExecError> {
        if namespace != NamespaceId::DEFAULT {
            return Err(ExecError::Badarg);
        }
        let arity = u8::try_from(args.len()).map_err(|_| ExecError::Badarg)?;
        let entry = self
            .module_registry
            .lookup_mfa(entry_module, entry_function, arity)?;
        let instruction_pointer = entry.module.label_ip(entry.label)?;

        let pid = self.next_pid;
        self.next_pid = self.next_pid.saturating_add(1);

        let mut process = Process::with_capabilities(pid, DEFAULT_HEAP_SIZE, CapabilitySet::all());
        process.set_group_leader(Term::pid(pid));
        process.set_priority(Priority::Normal);
        process.set_namespace_id(namespace);
        process.set_code_position(Some(CodePosition {
            module: entry_module,
            instruction_pointer,
        }));
        process.set_current_module(entry.module);
        for (index, arg) in args.into_iter().enumerate().take(1024) {
            if let Ok(register) = u16::try_from(index) {
                process.set_x_reg(register, arg);
            }
        }
        self.ready.push(pid, process.priority());
        self.processes.insert(pid, process);
        Ok(pid)
    }

    /// Wake a previously waiting process so it can be run by a later host tick.
    pub fn wake(&mut self, pid: u64) -> bool {
        if !self.waiting.remove(&pid) {
            return false;
        }
        let Some(process) = self.processes.get_mut(&pid) else {
            return false;
        };
        if process.transition_to(ProcessStatus::Running).is_err() {
            return false;
        }
        self.ready.push(pid, process.priority());
        true
    }

    /// Deliver a message to a local process and wake it if it was blocked.
    pub fn send(&mut self, pid: u64, message: Term) -> bool {
        let Some(process) = self.processes.get_mut(&pid) else {
            return false;
        };
        process.mailbox_mut().push_owned(message);
        if self.waiting.contains(&pid) {
            return self.wake(pid);
        }
        true
    }

    /// Execute at most one ready-queue snapshot. Processes that yield are
    /// requeued for the next host-driven turn, preserving cooperative fairness.
    pub fn run_until_idle(&mut self) -> WasmRunSummary {
        let mut summary = WasmRunSummary::default();
        let budget = self.ready.len();
        let mut yielded_next_tick = Vec::new();

        for _ in 0..budget {
            let Some(pid) = self.ready.pop() else {
                break;
            };
            if self.waiting.contains(&pid) {
                continue;
            }
            let Some(mut process) = self.processes.remove(&pid) else {
                continue;
            };
            let priority = process.priority();
            if !matches!(process.status(), ProcessStatus::Running) {
                let _transition = process.transition_to(ProcessStatus::Running);
            }
            process.reset_reductions(crate::scheduler::DEFAULT_REDUCTION_BUDGET);

            let Some(module) = process.current_module().cloned() else {
                self.exit_errors
                    .insert(pid, ExecError::InvalidOperand("current module"));
                summary.errored.push(pid);
                continue;
            };

            let services = self.native_services();
            let result = run_with_native_services(
                &mut process,
                module.as_ref(),
                self.module_registry.as_ref(),
                &services,
            );
            summary.executed += 1;

            match result {
                Ok(ExecutionResult::Yielded) => {
                    let _transition = process.transition_to(ProcessStatus::Yielded);
                    self.processes.insert(pid, process);
                    yielded_next_tick.push((pid, priority));
                    summary.yielded.push(pid);
                }
                Ok(ExecutionResult::Waiting) => {
                    let _transition = process.transition_to(ProcessStatus::Waiting);
                    self.processes.insert(pid, process);
                    self.waiting.insert(pid);
                    summary.waiting.push(pid);
                }
                Ok(ExecutionResult::Exited(reason)) => {
                    let x0 = process.x_reg(0);
                    let _transition = process.transition_to(ProcessStatus::Exited(reason));
                    self.exit_reasons.insert(pid, reason);
                    // Deep-copy while the process heap is still alive; the
                    // process is dropped at the end of this scope.
                    self.exit_results
                        .insert(pid, super::exit_capture::capture_term(x0));
                    summary.exited.push(pid);
                }
                Ok(ExecutionResult::DirtyCall { .. }) => {
                    self.exit_errors.insert(
                        pid,
                        ExecError::UnsupportedOpcode {
                            name: "dirty native call on wasm",
                        },
                    );
                    summary.errored.push(pid);
                }
                Err(error) => {
                    self.exit_errors.insert(pid, error);
                    summary.errored.push(pid);
                }
            }
        }

        for (pid, priority) in yielded_next_tick {
            self.ready.push(pid, priority);
        }
        summary
    }

    /// Return a process exit result captured from x(0), if available.
    ///
    /// The result is an owning deep copy that outlives the exited process.
    #[must_use]
    pub fn take_exit_result(&mut self, pid: u64) -> Option<OwnedTerm> {
        self.exit_results.remove(&pid)
    }

    /// Return all currently recorded exit results without consuming them.
    ///
    /// The returned terms borrow storage owned by this scheduler; they stay
    /// valid until the corresponding entry is removed via `take_exit_result`.
    #[must_use]
    pub fn exit_results(&self) -> Vec<(u64, Term)> {
        self.exit_results
            .iter()
            .map(|(pid, owned)| (*pid, owned.root()))
            .collect()
    }

    fn native_services(&self) -> NativeServices {
        NativeServices {
            atom_table: Some(Arc::clone(&self.atom_table)),
            ..NativeServices::default()
        }
    }
}

#[derive(Default)]
struct ReadyQueues {
    max: VecDeque<u64>,
    high: VecDeque<u64>,
    normal: VecDeque<u64>,
    low: VecDeque<u64>,
}

impl ReadyQueues {
    fn push(&mut self, pid: u64, priority: Priority) {
        match priority {
            Priority::Max => self.max.push_back(pid),
            Priority::High => self.high.push_back(pid),
            Priority::Normal => self.normal.push_back(pid),
            Priority::Low => self.low.push_back(pid),
        }
    }

    fn pop(&mut self) -> Option<u64> {
        self.max
            .pop_front()
            .or_else(|| self.high.pop_front())
            .or_else(|| self.normal.pop_front())
            .or_else(|| self.low.pop_front())
    }

    fn len(&self) -> usize {
        self.max.len() + self.high.len() + self.normal.len() + self.low.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn wasm_scheduler_starts_empty_and_runs_idle_round() {
        let atom_table = Arc::new(AtomTable::with_common_atoms());
        let modules = Arc::new(ModuleRegistry::new());
        let bifs = Arc::new(BifRegistryImpl::new());
        let mut scheduler = WasmScheduler::new(atom_table, modules, bifs);

        let summary = scheduler.run_until_idle();

        assert_eq!(summary.executed, 0);
        assert!(summary.exited.is_empty());
    }
}