#![cfg(all(unix, feature = "experimental-host-interrupt"))]
use std::{
sync::{
Arc, Barrier, Mutex,
atomic::{AtomicBool, Ordering},
},
thread,
time::Duration,
};
use anyhow::Result;
use wasmer::{
AsStoreMut, Exception, Function, FunctionEnv, Instance, Module, RuntimeError, Store, Tag,
imports,
};
use wasmer_vm::TrapCode;
const INFINITE_LOOP_WAT: &str = r#"
(module
(func (export "infinite")
loop
br 0
end
)
)"#;
const INFINITE_ATOMIC_WAIT_WAT: &str = r#"
(module
(memory 1 1 shared)
(func (export "infinite")
i32.const 0
i32.const 0
i64.const -1
memory.atomic.wait32
drop
)
)"#;
#[test]
fn test_interrupt_hot_loop() -> Result<()> {
test_interruptible(INFINITE_LOOP_WAT)
}
#[test]
fn test_interrupt_memory_wait() -> Result<()> {
test_interruptible(INFINITE_ATOMIC_WAIT_WAT)
}
fn test_interruptible(wat: &'static str) -> Result<()> {
let barrier = Arc::new(Barrier::new(2));
let interrupter_slot = Arc::new(Mutex::new(None));
let worker = thread::spawn({
let barrier = barrier.clone();
let interrupter_slot = interrupter_slot.clone();
move || {
let wasm = wat::parse_str(wat)?;
let mut store = Store::default();
let interrupter = store.interrupter();
interrupter_slot.lock().unwrap().replace(interrupter);
let module = Module::new(&store, &wasm)?;
let imports = imports! {};
let instance = Instance::new(&mut store, &module, &imports)?;
let f = instance
.exports
.get_typed_function::<(), ()>(&store, "infinite")?;
barrier.wait();
anyhow::Ok(f.call(&mut store))
}
});
barrier.wait();
thread::sleep(Duration::from_millis(500));
interrupter_slot
.lock()
.unwrap()
.as_ref()
.unwrap()
.interrupt();
let result = worker.join().unwrap().unwrap().unwrap_err();
assert_eq!(result.to_trap().unwrap(), TrapCode::HostInterrupt);
Ok(())
}
#[test]
fn correct_store_is_interrupted_only() -> Result<()> {
let barrier = Arc::new(Barrier::new(2));
let finished = Arc::new(AtomicBool::new(false));
let interrupter_slot = Arc::new(Mutex::new(None));
let worker = thread::spawn({
let barrier = barrier.clone();
let finished = finished.clone();
let interrupter_slot = interrupter_slot.clone();
move || {
let wasm = wat::parse_str(INFINITE_LOOP_WAT)?;
let mut store = Store::default();
let interrupter = store.interrupter();
interrupter_slot.lock().unwrap().replace(interrupter);
let module = Module::new(&store, &wasm)?;
let imports = imports! {};
let instance = Instance::new(&mut store, &module, &imports)?;
let f = instance
.exports
.get_typed_function::<(), ()>(&store, "infinite")?;
barrier.wait();
let res = f.call(&mut store);
finished.store(true, Ordering::SeqCst);
anyhow::Ok(res)
}
});
let store2 = Store::default();
let interrupter2 = store2.interrupter();
barrier.wait();
thread::sleep(Duration::from_millis(500));
interrupter2.interrupt();
thread::sleep(Duration::from_millis(500));
assert!(!finished.load(Ordering::SeqCst));
interrupter_slot
.lock()
.unwrap()
.as_ref()
.unwrap()
.interrupt();
let result = worker.join().unwrap().unwrap().unwrap_err();
assert!(finished.load(Ordering::SeqCst));
assert_eq!(result.to_trap().unwrap(), TrapCode::HostInterrupt);
Ok(())
}
#[test]
fn interrupted_store_cant_be_entered_again() -> Result<()> {
let store = Store::default();
let store_id = store.id();
let interrupt_guard = wasmer_vm::interrupt_registry::install(store_id)?;
wasmer_vm::interrupt_registry::interrupt(store_id)?;
assert!(matches!(
wasmer_vm::interrupt_registry::install(store_id),
Err(wasmer_vm::interrupt_registry::InstallError::AlreadyInterrupted)
));
drop(interrupt_guard);
Ok(())
}
#[test]
fn imported_functions_are_interrupted_correctly() -> Result<()> {
test_imported_function_interrupt(|store, rx| {
Function::new_typed(store, move || {
rx.recv().unwrap();
})
})
}
#[test]
fn imported_functions_are_interrupted_if_exception_is_thrown() -> Result<()> {
test_imported_function_interrupt(|store, rx| {
let env = FunctionEnv::new(store, ());
Function::new_typed_with_env(store, &env, move |mut env: wasmer::FunctionEnvMut<_>| {
rx.recv().unwrap();
let mut store = env.as_store_mut();
let tag = Tag::new(&mut store, []);
let exc = Exception::new(&mut store, &tag, &[]);
Result::<(), _>::Err(RuntimeError::exception(&store, exc))
})
})
}
fn test_imported_function_interrupt<F>(build_imported_function: F) -> Result<()>
where
F: (FnOnce(&mut Store, crossbeam_channel::Receiver<()>) -> Function) + Send + Sync + 'static,
{
let (tx, rx) = crossbeam_channel::bounded(1);
let interrupter_slot = Arc::new(Mutex::new(None));
let barrier = Arc::new(Barrier::new(2));
let finished = Arc::new(AtomicBool::new(false));
let worker = thread::spawn({
let barrier = barrier.clone();
let finished = finished.clone();
let interrupter_slot = interrupter_slot.clone();
move || {
let wasm = wat::parse_str(
r#"
(module
(import "env" "f" (func $f))
(func (export "infinite")
call $f
)
)"#,
)?;
let mut store = Store::default();
let interrupter = store.interrupter();
interrupter_slot.lock().unwrap().replace(interrupter);
let module = Module::new(&store, &wasm)?;
let f = build_imported_function(&mut store, rx);
let imports = imports! {
"env" => {
"f" => f
}
};
let instance = Instance::new(&mut store, &module, &imports)?;
let f = instance
.exports
.get_typed_function::<(), ()>(&store, "infinite")?;
barrier.wait();
let res = f.call(&mut store);
finished.store(true, Ordering::SeqCst);
anyhow::Ok(res)
}
});
barrier.wait();
thread::sleep(Duration::from_millis(500));
interrupter_slot
.lock()
.unwrap()
.as_ref()
.unwrap()
.interrupt();
thread::sleep(Duration::from_millis(100));
assert!(!finished.load(Ordering::SeqCst));
tx.send(()).unwrap();
let result = worker.join().unwrap().unwrap().unwrap_err();
assert!(finished.load(Ordering::SeqCst));
assert_eq!(result.to_trap().unwrap(), TrapCode::HostInterrupt);
Ok(())
}