use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use aion_core::{ActivityError, Payload};
use beamr::atom::AtomTable;
use beamr::module::ModuleRegistry;
use beamr::native::{BifRegistryImpl, NativeRegistrationError};
use beamr::process::ExitReason;
use beamr::scheduler::{Scheduler, SchedulerConfig};
use beamr::term::Term;
use crate::error::EngineError;
use super::config::{RuntimeConfig, SignalDeliveryConfig};
use super::nif::{Mfa, NifRegistration};
use super::payload::payload_to_term;
pub type Pid = u64;
type RetainedHeap = Box<[u64]>;
type RetainedHeaps = Vec<RetainedHeap>;
type RetainedSpawnHeaps = Arc<dashmap::DashMap<Pid, Mutex<RetainedHeaps>>>;
#[derive(Debug, Default, Eq, PartialEq)]
pub struct RuntimeInput {
terms: Vec<Term>,
heaps: RetainedHeaps,
}
impl RuntimeInput {
pub fn from_payload(payload: &Payload) -> Result<Self, EngineError> {
let (term, heaps) = payload_to_term(payload)?.into_parts();
Ok(Self {
terms: vec![term],
heaps,
})
}
#[must_use]
pub fn arity(&self) -> u8 {
u8::try_from(self.terms.len()).unwrap_or(u8::MAX)
}
fn into_spawn_parts(self) -> (Vec<Term>, RetainedHeaps) {
(self.terms, self.heaps)
}
}
pub struct RuntimeHandle {
pub(super) scheduler: Scheduler,
pub(super) atom_table: Arc<AtomTable>,
pub(super) module_registry: Arc<ModuleRegistry>,
pub(super) native_registry: Arc<BifRegistryImpl>,
nif_state: Arc<super::nif_state::EngineNifState>,
activity_results: Arc<dashmap::DashMap<(Pid, Pid), Payload>>,
activity_errors: Arc<dashmap::DashMap<(Pid, Pid), ActivityError>>,
registered_nif_modules: Arc<dashmap::DashSet<String>>,
spawn_heaps: RetainedSpawnHeaps,
signal_delivery: SignalDeliveryConfig,
pub(super) wake_confirmer: super::wake_confirm::WakeConfirmer,
spawned_pid_watermark: AtomicU64,
}
impl RuntimeHandle {
pub fn new(config: RuntimeConfig) -> Result<Self, EngineError> {
let atom_table = Arc::new(AtomTable::with_common_atoms());
let module_registry = Arc::new(ModuleRegistry::new());
let nif_state = Arc::new(super::nif_state::EngineNifState::default());
let scheduler_config = SchedulerConfig {
thread_count: config.thread_count,
nif_private_data: Some(Arc::clone(&nif_state) as _),
..Default::default()
};
let native_registry = Arc::new(BifRegistryImpl::new());
register_all_bifs(&native_registry, &atom_table)?;
let scheduler = Scheduler::with_code_server(
scheduler_config,
Arc::clone(&module_registry),
Arc::clone(&atom_table),
Arc::clone(&native_registry),
)
.map_err(runtime_error_from_display)?;
Ok(Self {
scheduler,
atom_table,
module_registry,
native_registry,
nif_state,
activity_results: Arc::new(dashmap::DashMap::new()),
activity_errors: Arc::new(dashmap::DashMap::new()),
registered_nif_modules: Arc::new(dashmap::DashSet::new()),
spawn_heaps: Arc::new(dashmap::DashMap::new()),
signal_delivery: config.signal_delivery,
wake_confirmer: super::wake_confirm::WakeConfirmer::new(config.signal_delivery)?,
spawned_pid_watermark: AtomicU64::new(0),
})
}
pub(crate) fn nif_state(&self) -> &Arc<super::nif_state::EngineNifState> {
&self.nif_state
}
pub(crate) fn signal_delivery(&self) -> SignalDeliveryConfig {
self.signal_delivery
}
pub fn install_nifs(&self, registration: NifRegistration) -> Result<(), EngineError> {
for entry in registration.into_entries() {
let mfa = entry.mfa;
let module = self.atom_table.intern(&mfa.module);
let function = self.atom_table.intern(&mfa.function);
let capability = beamr::native::Capability::ExternalIo;
let result = if entry.is_dirty {
self.native_registry.register_dirty(
module,
function,
mfa.arity,
entry.function,
beamr::scheduler::dirty::DirtySchedulerKind::Cpu,
capability,
)
} else {
self.native_registry.register(
module,
function,
mfa.arity,
entry.function,
capability,
)
};
result.map_err(|error| nif_registration_error(&mfa, error))?;
self.registered_nif_modules.insert(mfa.module);
}
Ok(())
}
#[must_use]
pub fn registered_nif_modules(&self) -> Vec<String> {
let mut module_names: Vec<_> = self
.registered_nif_modules
.iter()
.map(|module_name| module_name.key().clone())
.collect();
module_names.sort();
module_names
}
pub fn spawn_workflow(
&self,
deployed_module: &str,
function: &str,
input: RuntimeInput,
) -> Result<Pid, EngineError> {
self.spawn_process(deployed_module, function, input)
}
pub fn spawn_workflow_trapping(
&self,
deployed_module: &str,
function: &str,
input: RuntimeInput,
) -> Result<Pid, EngineError> {
self.release_dead_spawn_heaps();
let module = self.atom_table.intern(deployed_module);
let function = self.atom_table.intern(function);
let (terms, heaps) = input.into_spawn_parts();
let pid = self
.scheduler
.spawn_trap_exit(module, function, terms)
.map_err(runtime_error_from_display)?;
self.record_spawned_pid(pid);
self.retain_spawn_heaps(pid, heaps);
Ok(pid)
}
pub fn spawn_activity(
&self,
parent_pid: Pid,
deployed_module: &str,
function: &str,
input: RuntimeInput,
) -> Result<Pid, EngineError> {
self.release_dead_spawn_heaps();
self.ensure_live_pid(parent_pid)?;
self.wait_for_process_ready(parent_pid)?;
let arity = input.arity();
let module = self.atom_table.intern(deployed_module);
let function_atom = self.atom_table.intern(function);
let (terms, heaps) = input.into_spawn_parts();
let pid = if self.is_dirty_with_arity(deployed_module, function, arity) {
self.scheduler
.spawn_link_dirty(parent_pid, module, function_atom, terms)
.map_err(runtime_error_from_display)?
} else {
self.scheduler
.spawn_link(parent_pid, module, function_atom, terms)
.map_err(runtime_error_from_display)?
};
self.record_spawned_pid(pid);
self.retain_spawn_heaps(pid, heaps);
Ok(pid)
}
#[must_use]
pub fn is_dirty(&self, module: &str, function: &str) -> bool {
self.is_dirty_with_arity(module, function, 1)
}
#[must_use]
pub fn is_dirty_with_arity(&self, module: &str, function: &str, arity: u8) -> bool {
let module = self.atom_table.intern(module);
let function = self.atom_table.intern(function);
self.native_registry
.lookup(module, function, arity)
.is_some_and(|entry| entry.dirty_kind.is_some())
}
pub fn cancel_pid(&self, pid: Pid) -> Result<(), EngineError> {
self.ensure_live_pid(pid)?;
self.scheduler.terminate_process(pid, ExitReason::Kill);
self.release_spawn_heaps(pid);
Ok(())
}
pub fn set_trap_exit(&self, pid: Pid, value: bool) -> Result<bool, EngineError> {
self.scheduler
.set_trap_exit(pid, value)
.map_err(runtime_error_from_display)
}
#[must_use]
pub fn is_live(&self, pid: Pid) -> bool {
self.scheduler.process_table().get(pid).is_some()
}
pub fn trap_exit(&self, pid: Pid) -> Result<bool, EngineError> {
self.scheduler
.trap_exit(pid)
.ok_or_else(|| runtime_error(format!("process {pid} is not live")))
}
pub fn is_linked(&self, left: Pid, right: Pid) -> Result<bool, EngineError> {
self.ensure_live_pid(left)?;
self.ensure_live_pid(right)?;
Ok(self.scheduler.is_linked(left, right))
}
pub fn shutdown(&self) -> Result<(), EngineError> {
self.wake_confirmer.shutdown();
self.scheduler.shutdown();
self.spawn_heaps.clear();
Ok(())
}
fn spawn_process(
&self,
deployed_module: &str,
function: &str,
input: RuntimeInput,
) -> Result<Pid, EngineError> {
self.release_dead_spawn_heaps();
let module = self.atom_table.intern(deployed_module);
let function = self.atom_table.intern(function);
let (terms, heaps) = input.into_spawn_parts();
let pid = self
.scheduler
.spawn(module, function, terms)
.map_err(runtime_error_from_display)?;
self.record_spawned_pid(pid);
self.retain_spawn_heaps(pid, heaps);
Ok(pid)
}
fn record_spawned_pid(&self, pid: Pid) {
self.spawned_pid_watermark.fetch_max(pid, Ordering::AcqRel);
}
fn retain_spawn_heaps(&self, pid: Pid, heaps: RetainedHeaps) {
if heaps.is_empty() {
return;
}
self.spawn_heaps.insert(pid, Mutex::new(heaps));
}
pub(super) fn release_spawn_heaps(&self, pid: Pid) {
self.spawn_heaps.remove(&pid);
}
fn release_dead_spawn_heaps(&self) {
let dead_pids: Vec<Pid> = self
.spawn_heaps
.iter()
.filter_map(|entry| {
let pid = *entry.key();
self.scheduler
.process_table()
.get(pid)
.is_none()
.then_some(pid)
})
.collect();
for pid in dead_pids {
self.release_spawn_heaps(pid);
}
}
pub(super) fn ensure_live_pid(&self, pid: Pid) -> Result<(), EngineError> {
if self.scheduler.process_table().get(pid).is_some() {
Ok(())
} else {
Err(runtime_error(format!("process {pid} is not live")))
}
}
pub(super) fn ensure_monitorable_pid(&self, pid: Pid) -> Result<(), EngineError> {
if self.scheduler.process_table().get(pid).is_some() {
return Ok(());
}
if pid > 0 && pid <= self.spawned_pid_watermark.load(Ordering::Acquire) {
return Ok(());
}
Err(runtime_error(format!(
"process {pid} was never spawned by this runtime"
)))
}
#[cfg(test)]
pub fn register_waiting_test_module(&self, deployed_name: &str, function: &str) {
use std::collections::HashMap;
use beamr::loader::Instruction;
use beamr::loader::decode::compact::Operand;
use beamr::module::Module;
let module = self.atom_table.intern(deployed_name);
let function = self.atom_table.intern(function);
let label = 10;
self.module_registry.insert(Module {
name: module,
generation: 0,
origin: beamr::module::ModuleOrigin::Preloaded,
exports: HashMap::from([((function, 1), label)]),
label_index: HashMap::from([(label, 0)]),
code: vec![
Instruction::Label { label },
Instruction::Wait {
fail: Operand::Label(label),
},
],
function_table: Vec::new(),
line_table: Vec::new(),
literals: Vec::new(),
constant_pool: beamr::constant_pool::ConstantPool::new(),
resolved_imports: Vec::new(),
lambdas: Vec::new(),
string_table: Vec::new(),
line_info: Vec::new(),
});
}
#[cfg(test)]
pub fn spawn_test_process(&self) -> Result<Pid, EngineError> {
let pid = self.scheduler.spawn_test_process(false);
self.record_spawned_pid(pid);
Ok(pid)
}
#[cfg(test)]
pub fn spawn_test_process_with_trap_exit(&self, trap_exit: bool) -> Result<Pid, EngineError> {
let pid = self.scheduler.spawn_test_process(trap_exit);
self.record_spawned_pid(pid);
Ok(pid)
}
#[cfg(test)]
pub fn spawn_linked_test_process(&self, parent_pid: Pid) -> Result<Pid, EngineError> {
self.ensure_live_pid(parent_pid)?;
let pid = self
.scheduler
.spawn_linked_test_process(parent_pid)
.map_err(runtime_error_from_display)?;
self.record_spawned_pid(pid);
Ok(pid)
}
#[cfg(test)]
pub fn has_trapped_exit_message(
&self,
target_pid: Pid,
source_pid: Pid,
) -> Result<bool, EngineError> {
self.ensure_live_pid(target_pid)?;
Ok(self
.scheduler
.has_trapped_exit_message(target_pid, source_pid)
.unwrap_or(false))
}
#[cfg(test)]
pub fn wait_for_trapped_exit(
&self,
target_pid: Pid,
source_pid: Pid,
) -> Result<(), EngineError> {
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(50);
while std::time::Instant::now() < deadline {
if self
.scheduler
.has_trapped_exit_message(target_pid, source_pid)
.unwrap_or(false)
{
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
Err(runtime_error(format!(
"trapped exit from {source_pid} to {target_pid} did not arrive"
)))
}
#[cfg(test)]
pub fn terminate_test_process_with_error(&self, pid: Pid) -> Result<(), EngineError> {
self.ensure_live_pid(pid)?;
self.scheduler.terminate_process(pid, ExitReason::Error);
Ok(())
}
#[cfg(test)]
pub(crate) fn lookup_native_for_test(
&self,
module: &str,
function: &str,
arity: u8,
) -> Option<beamr::native::NativeEntry> {
let module = self.atom_table.intern(module);
let function = self.atom_table.intern(function);
self.native_registry.lookup(module, function, arity)
}
#[cfg(test)]
pub(crate) fn run_until_exit_for_test(&self, pid: Pid) -> (ExitReason, Term) {
let (reason, owned_result) = self.scheduler.run_until_exit(pid);
self.release_spawn_heaps(pid);
(reason, owned_result.root())
}
#[cfg(test)]
pub(crate) fn retained_spawn_heap_count_for_test(&self) -> usize {
self.release_dead_spawn_heaps();
self.spawn_heaps.len()
}
}
fn runtime_error(reason: String) -> EngineError {
EngineError::Runtime { reason }
}
fn runtime_error_from_display(reason: impl std::fmt::Display) -> EngineError {
runtime_error(reason.to_string())
}
fn nif_registration_error(mfa: &Mfa, error: NativeRegistrationError) -> EngineError {
match error {
NativeRegistrationError::DuplicateMfa { .. } => EngineError::NifRegistration {
reason: format!("native function already registered for {}", mfa.display()),
},
}
}
fn register_all_bifs(
registry: &BifRegistryImpl,
atom_table: &AtomTable,
) -> Result<(), EngineError> {
use beamr::native::{
bifs::register_gate1_bifs, gate3_bifs::register_gate3_bifs,
gleam_ffi::register_gleam_ffi_bifs, otp_stubs::init_otp_atoms,
otp_stubs::register_otp_stubs, process_bifs::register_gate2_bifs,
selector_ffi::register_selector_bifs, stdlib_stubs::register_stdlib_stubs,
};
register_gate1_bifs(registry, atom_table).map_err(runtime_error_from_display)?;
register_gate2_bifs(registry, atom_table).map_err(runtime_error_from_display)?;
register_gate3_bifs(registry, atom_table).map_err(runtime_error_from_display)?;
register_stdlib_stubs(registry, atom_table).map_err(runtime_error_from_display)?;
register_selector_bifs(registry, atom_table).map_err(runtime_error_from_display)?;
register_gleam_ffi_bifs(registry, atom_table).map_err(runtime_error_from_display)?;
init_otp_atoms(atom_table);
register_otp_stubs(registry, atom_table).map_err(runtime_error_from_display)?;
Ok(())
}
mod delivery;
#[cfg(test)]
#[path = "handle/test_support.rs"]
mod test_support;
#[cfg(test)]
mod tests {
use aion_core::Payload;
use std::time::Duration;
use beamr::loader::Instruction;
use beamr::loader::decode::compact::Operand;
use beamr::module::{Module, ResolvedImport, ResolvedImportTarget};
use beamr::native::ProcessContext;
use beamr::term::Term;
use beamr::term::binary_ref::BinaryRef;
use super::{RuntimeHandle, RuntimeInput};
use crate::error::EngineError;
use crate::runtime::{Mfa, NifEntry, NifRegistration, RuntimeConfig, SignalDeliveryConfig};
fn forty_two(args: &[Term], _context: &mut ProcessContext) -> Result<Term, Term> {
if args.len() > 255 {
return Err(Term::small_int(0));
}
Ok(Term::small_int(42))
}
fn thirteen(args: &[Term], _context: &mut ProcessContext) -> Result<Term, Term> {
if args.len() > 255 {
return Err(Term::small_int(0));
}
Ok(Term::small_int(13))
}
fn binary_length(args: &[Term], _context: &mut ProcessContext) -> Result<Term, Term> {
match args {
[term] => BinaryRef::new(*term)
.and_then(|binary| i64::try_from(binary.as_bytes().len()).ok())
.map(Term::small_int)
.ok_or_else(|| Term::small_int(0)),
_ => Err(Term::small_int(0)),
}
}
fn native_call_module_for_test(
module: beamr::atom::Atom,
function: beamr::atom::Atom,
target_module: beamr::atom::Atom,
target_function: beamr::atom::Atom,
native_entry: Option<beamr::native::NativeEntry>,
) -> Module {
native_call_module_with_arity_for_test(
module,
function,
target_module,
target_function,
0,
native_entry,
)
}
fn native_call_module_with_arity_for_test(
module: beamr::atom::Atom,
function: beamr::atom::Atom,
target_module: beamr::atom::Atom,
target_function: beamr::atom::Atom,
arity: u8,
native_entry: Option<beamr::native::NativeEntry>,
) -> Module {
let label = 1;
let code = vec![
Instruction::Label { label },
Instruction::CallExt {
arity: Operand::Unsigned(arity.into()),
import: Operand::Unsigned(0),
},
Instruction::Return,
];
let mut module_data = Module {
name: module,
generation: 0,
origin: beamr::module::ModuleOrigin::Preloaded,
exports: std::collections::HashMap::from([((function, arity), label)]),
label_index: std::collections::HashMap::from([(label, 0)]),
code,
function_table: Vec::new(),
line_table: Vec::new(),
literals: Vec::new(),
constant_pool: beamr::constant_pool::ConstantPool::new(),
resolved_imports: Vec::new(),
lambdas: Vec::new(),
string_table: Vec::new(),
line_info: Vec::new(),
};
if let Some(native_entry) = native_entry {
module_data.resolved_imports.push(ResolvedImport {
module: target_module,
function: target_function,
arity,
target: ResolvedImportTarget::Native(native_entry),
});
}
module_data
}
fn assert_send_sync<T: Send + Sync>() {}
fn fixture_workflow_beam() -> &'static [u8] {
include_bytes!("../../tests/fixtures/aion_fixture_workflow.beam")
}
#[test]
fn runtime_handle_is_send_sync() {
assert_send_sync::<RuntimeHandle>();
}
#[test]
fn registers_spawns_and_shuts_down() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
runtime.register_module("aion_fixture_workflow", fixture_workflow_beam())?;
let pid =
runtime.spawn_workflow("aion_fixture_workflow", "wait", RuntimeInput::default())?;
assert!(runtime.cancel_pid(pid).is_ok());
runtime.shutdown()?;
Ok(())
}
#[test]
fn signal_delivery_to_dead_process_returns_typed_error()
-> Result<(), Box<dyn std::error::Error>> {
let signal_delivery =
SignalDeliveryConfig::new(Duration::ZERO, 1, Duration::ZERO, Duration::ZERO);
let runtime =
RuntimeHandle::new(RuntimeConfig::new(Some(1)).with_signal_delivery(signal_delivery))?;
let pid = runtime.spawn_test_process()?;
runtime.terminate_test_process_with_error(pid)?;
let error = runtime
.deliver_signal_received(pid)
.err()
.ok_or("dead process delivery unexpectedly succeeded")?;
assert!(matches!(error, EngineError::Runtime { .. }));
runtime.shutdown()?;
Ok(())
}
#[test]
fn duplicate_nif_mfa_returns_typed_error() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
let mfa = Mfa::new("host", "answer", 0);
let mut registration = NifRegistration::new();
registration.add_host_nifs([
NifEntry::new(mfa.clone(), forty_two),
NifEntry::dirty(mfa, thirteen),
]);
let error = runtime.install_nifs(registration).err();
assert!(matches!(
error,
Some(EngineError::NifRegistration { reason })
if reason.contains("host:answer/0")
));
assert_eq!(runtime.registered_nif_modules(), vec!["host"]);
runtime.shutdown()?;
Ok(())
}
#[test]
fn payload_binary_remains_valid_through_spawn_and_is_released()
-> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
let mfa = Mfa::new("host", "binary_length", 1);
let mut registration = NifRegistration::new();
registration.add_host_nifs([NifEntry::new(mfa, binary_length)]);
runtime.install_nifs(registration)?;
let native_entry = runtime.lookup_native_for_test("host", "binary_length", 1);
let module = native_call_module_with_arity_for_test(
runtime.atom_table.intern("payload_echo"),
runtime.atom_table.intern("run"),
runtime.atom_table.intern("host"),
runtime.atom_table.intern("binary_length"),
1,
native_entry,
);
runtime.module_registry.insert(module);
let payload = Payload::new(
aion_core::ContentType::Json,
br#"{"hello":"world"}"#.to_vec(),
);
let pid =
runtime.spawn_workflow("payload_echo", "run", RuntimeInput::from_payload(&payload)?)?;
assert_eq!(runtime.retained_spawn_heap_count_for_test(), 1);
let (reason, result) = runtime.run_until_exit_for_test(pid);
assert_eq!(reason, beamr::process::ExitReason::Normal);
assert_eq!(
result.as_small_int(),
Some(i64::try_from(payload.bytes().len()).unwrap_or(0))
);
assert_eq!(runtime.retained_spawn_heap_count_for_test(), 0);
runtime.shutdown()?;
Ok(())
}
#[test]
fn workflow_outcome_releases_payload_heaps() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
let mfa = Mfa::new("host", "binary_length", 1);
let mut registration = NifRegistration::new();
registration.add_host_nifs([NifEntry::new(mfa, binary_length)]);
runtime.install_nifs(registration)?;
let native_entry = runtime.lookup_native_for_test("host", "binary_length", 1);
let module = native_call_module_with_arity_for_test(
runtime.atom_table.intern("payload_workflow_outcome"),
runtime.atom_table.intern("run"),
runtime.atom_table.intern("host"),
runtime.atom_table.intern("binary_length"),
1,
native_entry,
);
runtime.module_registry.insert(module);
let payload = Payload::new(
aion_core::ContentType::Json,
br#"{"workflow":"outcome"}"#.to_vec(),
);
let pid = runtime.spawn_workflow(
"payload_workflow_outcome",
"run",
RuntimeInput::from_payload(&payload)?,
)?;
assert_eq!(runtime.retained_spawn_heap_count_for_test(), 1);
let outcome = runtime.workflow_outcome(pid)?;
assert_eq!(
outcome?,
Payload::from_json(&serde_json::json!(payload.bytes().len()))?
);
assert_eq!(runtime.retained_spawn_heap_count_for_test(), 0);
runtime.shutdown()?;
Ok(())
}
#[test]
fn repeated_completed_payload_spawns_do_not_accumulate_retained_heaps()
-> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
let mfa = Mfa::new("host", "binary_length", 1);
let mut registration = NifRegistration::new();
registration.add_host_nifs([NifEntry::new(mfa, binary_length)]);
runtime.install_nifs(registration)?;
let native_entry = runtime.lookup_native_for_test("host", "binary_length", 1);
let module = native_call_module_with_arity_for_test(
runtime.atom_table.intern("payload_echo_many"),
runtime.atom_table.intern("run"),
runtime.atom_table.intern("host"),
runtime.atom_table.intern("binary_length"),
1,
native_entry,
);
runtime.module_registry.insert(module);
let payload = Payload::new(
aion_core::ContentType::Json,
br#"{"iteration":true}"#.to_vec(),
);
for _ in 0..1_000 {
let pid = runtime.spawn_workflow(
"payload_echo_many",
"run",
RuntimeInput::from_payload(&payload)?,
)?;
let (reason, result) = runtime.run_until_exit_for_test(pid);
assert_eq!(reason, beamr::process::ExitReason::Normal);
assert_eq!(
result.as_small_int(),
Some(i64::try_from(payload.bytes().len()).unwrap_or(0))
);
assert_eq!(runtime.retained_spawn_heap_count_for_test(), 0);
}
runtime.shutdown()?;
Ok(())
}
#[test]
fn distinct_nifs_are_registered_and_callable() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RuntimeHandle::new(RuntimeConfig::new(None))?;
let mut registration = NifRegistration::new();
registration.add_engine_nifs().add_host_nifs([
NifEntry::new(Mfa::new("host", "answer", 0), forty_two),
NifEntry::dirty(Mfa::new("host", "thirteen", 0), thirteen),
]);
runtime.install_nifs(registration)?;
assert_eq!(
runtime.registered_nif_modules(),
vec!["aion_flow_ffi", "host"]
);
let answer = runtime.lookup_native_for_test("host", "answer", 0);
assert!(answer.is_some());
assert!(
runtime
.lookup_native_for_test("host", "thirteen", 0)
.is_some_and(|entry| entry.dirty_kind.is_some())
);
let host_nif_call = native_call_module_for_test(
runtime.atom_table.intern("host_nif_call"),
runtime.atom_table.intern("answer"),
runtime.atom_table.intern("host"),
runtime.atom_table.intern("answer"),
answer,
);
runtime.module_registry.insert(host_nif_call);
let pid = runtime.spawn_workflow("host_nif_call", "answer", RuntimeInput::default())?;
let (reason, result) = runtime.run_until_exit_for_test(pid);
assert_eq!(reason, beamr::process::ExitReason::Normal);
assert_eq!(result, Term::small_int(42));
runtime.shutdown()?;
Ok(())
}
}