use crate::async_functions::{PollOnce, execute_across_threads};
use std::pin::Pin;
use std::task::{Context, Poll};
use wasmtime::Result;
use wasmtime::{AsContextMut, Config, Engine, Store, StoreContextMut, Trap, component::*};
use wasmtime_component_util::REALLOC_AND_FREE;
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn smoke() -> Result<()> {
let component = r#"
(component
(core module $m
(func (export "thunk"))
(func (export "thunk-trap") unreachable)
)
(core instance $i (instantiate $m))
(func (export "thunk")
(canon lift (core func $i "thunk"))
)
(func (export "thunk-trap")
(canon lift (core func $i "thunk-trap"))
)
)
"#;
let engine = super::async_engine();
let component = Component::new(&engine, component)?;
let mut store = Store::new(&engine, ());
let instance = Linker::new(&engine)
.instantiate_async(&mut store, &component)
.await?;
let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
thunk.call_async(&mut store, ()).await?;
let err = instance
.get_typed_func::<(), ()>(&mut store, "thunk-trap")?
.call_async(&mut store, ())
.await
.unwrap_err();
assert_eq!(err.downcast::<Trap>()?, Trap::UnreachableCodeReached);
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn smoke_func_wrap() -> Result<()> {
let component = r#"
(component
(type $f (func))
(import "i" (func $f))
(core module $m
(import "imports" "i" (func $i))
(func (export "thunk") call $i)
)
(core func $f (canon lower (func $f)))
(core instance $i (instantiate $m
(with "imports" (instance
(export "i" (func $f))
))
))
(func (export "thunk")
(canon lift (core func $i "thunk"))
)
)
"#;
let engine = super::async_engine();
let component = Component::new(&engine, component)?;
let mut store = Store::new(&engine, ());
let mut linker = Linker::new(&engine);
let mut root = linker.root();
root.func_wrap_async("i", |_: StoreContextMut<()>, _: ()| {
Box::new(async { Ok(()) })
})?;
let instance = linker.instantiate_async(&mut store, &component).await?;
let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
thunk.call_async(&mut store, ()).await?;
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn resume_separate_thread() -> Result<()> {
let mut config = wasmtime_test_util::component::config();
config.consume_fuel(true);
let engine = Engine::new(&config)?;
let component = format!(
r#"
(component
(import "yield" (func $yield (result (list u8))))
(core module $libc
(memory (export "memory") 1)
{REALLOC_AND_FREE}
)
(core instance $libc (instantiate $libc))
(core func $yield
(canon lower
(func $yield)
(memory $libc "memory")
(realloc (func $libc "realloc"))
)
)
(core module $m
(import "" "yield" (func $yield (param i32)))
(import "libc" "memory" (memory 0))
(func $start
i32.const 8
call $yield
)
(start $start)
)
(core instance (instantiate $m
(with "" (instance (export "yield" (func $yield))))
(with "libc" (instance $libc))
))
)
"#
);
let component = Component::new(&engine, component)?;
let mut linker = Linker::new(&engine);
linker
.root()
.func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| {
Box::new(async {
tokio::task::yield_now().await;
Ok((vec![1u8, 2u8],))
})
})?;
execute_across_threads(async move {
let mut store = Store::new(&engine, ());
store.set_fuel(u64::MAX).unwrap();
store.fuel_async_yield_interval(Some(1)).unwrap();
linker.instantiate_async(&mut store, &component).await?;
Ok::<_, wasmtime::Error>(())
})
.await?;
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn poll_through_wasm_activation() -> Result<()> {
let mut config = wasmtime_test_util::component::config();
config.consume_fuel(true);
let engine = Engine::new(&config)?;
let component = format!(
r#"
(component
(core module $m
{REALLOC_AND_FREE}
(memory (export "memory") 1)
(func (export "run") (param i32 i32)
)
)
(core instance $i (instantiate $m))
(func (export "run") (param "x" (list u8))
(canon lift (core func $i "run")
(memory $i "memory")
(realloc (func $i "realloc"))))
)
"#
);
let component = Component::new(&engine, component)?;
let linker = Linker::new(&engine);
let invoke_component = {
let engine = engine.clone();
async move {
let mut store = Store::new(&engine, ());
store.set_fuel(u64::MAX).unwrap();
store.fuel_async_yield_interval(Some(1)).unwrap();
let instance = linker.instantiate_async(&mut store, &component).await?;
let func = instance.get_typed_func::<(Vec<u8>,), ()>(&mut store, "run")?;
func.call_async(&mut store, (vec![1, 2, 3],)).await?;
Ok::<_, wasmtime::Error>(())
}
};
execute_across_threads(async move {
let mut store = Store::new(&engine, Some(Box::pin(invoke_component)));
let poll_once = wasmtime::Func::wrap_async(&mut store, |mut cx, _: ()| {
let invoke_component = cx.data_mut().take().unwrap();
Box::new(async move {
match PollOnce::new(invoke_component).await {
Ok(result) => {
result?;
Ok(1)
}
Err(future) => {
*cx.data_mut() = Some(future);
Ok(0)
}
}
})
});
let poll_once = poll_once.typed::<(), i32>(&mut store)?;
while poll_once.call_async(&mut store, ()).await? != 1 {
}
Ok::<_, wasmtime::Error>(())
})
.await?;
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn drop_resource_async() -> Result<()> {
use std::sync::Arc;
use std::sync::Mutex;
let engine = super::async_engine();
let c = Component::new(
&engine,
r#"
(component
(import "t" (type $t (sub resource)))
(core func $drop (canon resource.drop $t))
(core module $m
(import "" "drop" (func $drop (param i32)))
(func (export "f") (param i32)
(call $drop (local.get 0))
)
)
(core instance $i (instantiate $m
(with "" (instance
(export "drop" (func $drop))
))
))
(func (export "f") (param "x" (own $t))
(canon lift (core func $i "f")))
)
"#,
)?;
struct MyType;
let mut store = Store::new(&engine, ());
let mut linker = Linker::new(&engine);
let drop_status = Arc::new(Mutex::new("not dropped"));
let ds = drop_status.clone();
linker
.root()
.resource_async("t", ResourceType::host::<MyType>(), move |_, _| {
let ds = ds.clone();
Box::new(async move {
*ds.lock().unwrap() = "before yield";
tokio::task::yield_now().await;
*ds.lock().unwrap() = "after yield";
Ok(())
})
})?;
let i = linker.instantiate_async(&mut store, &c).await?;
let f = i.get_typed_func::<(Resource<MyType>,), ()>(&mut store, "f")?;
execute_across_threads(async move {
let resource = Resource::new_own(100);
f.call_async(&mut store, (resource,)).await?;
Ok::<_, wasmtime::Error>(())
})
.await?;
assert_eq!("after yield", *drop_status.lock().unwrap());
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn task_deletion() -> Result<()> {
let mut config = Config::new();
config.wasm_component_model_async(true);
config.wasm_component_model_threading(true);
config.wasm_component_model_async_stackful(true);
config.wasm_component_model_async_builtins(true);
let engine = Engine::new(&config)?;
let component = Component::new(
&engine,
r#"(component
(component $C
(core module $Memory (memory (export "mem") 1))
(core instance $memory (instantiate $Memory))
;; Defines the table for the thread start functions
(core module $libc
(table (export "__indirect_function_table") 3 funcref))
(core module $CM
(import "" "mem" (memory 1))
(import "" "task.return" (func $task-return (param i32)))
(import "" "task.cancel" (func $task-cancel))
(import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
(import "" "thread.suspend" (func $thread-suspend (result i32)))
(import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32)))
(import "" "thread.yield-to-suspended" (func $thread-yield-to-suspended (param i32) (result i32)))
(import "" "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable (param i32) (result i32)))
(import "" "thread.suspend-to" (func $thread-suspend-to (param i32) (result i32)))
(import "" "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable (param i32) (result i32)))
(import "" "thread.yield" (func $thread-yield (result i32)))
(import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32)))
(import "" "thread.index" (func $thread-index (result i32)))
(import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
(import "libc" "__indirect_function_table" (table $indirect-function-table 3 funcref))
;; Indices into the function table for the thread start functions
(global $call-return-ftbl-idx i32 (i32.const 0))
(global $suspend-ftbl-idx i32 (i32.const 1))
(global $yield-loop-ftbl-idx i32 (i32.const 2))
(func $call-return (param i32)
(call $task-return (local.get 0)))
(func $suspend (param i32)
(drop (call $thread-suspend)))
(func $yield-loop (param i32)
(loop $top
(drop (call $thread-yield))
(br $top)))
(func (export "explicit-thread-calls-return-stackful")
(call $thread-unsuspend
(call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42))))
(func (export "explicit-thread-calls-return-stackless") (result i32)
(call $thread-unsuspend
(call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42)))
(i32.const 0 (; EXIT ;)))
(func (export "cb") (param i32 i32 i32) (result i32)
(unreachable))
(func (export "explicit-thread-suspends-sync") (result i32)
(call $thread-unsuspend
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
(i32.const 42))
(func (export "explicit-thread-suspends-stackful")
(call $thread-unsuspend
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
(call $task-return (i32.const 42)))
(func (export "explicit-thread-suspends-stackless") (result i32)
(call $thread-unsuspend
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
(call $task-return (i32.const 42))
(i32.const 0))
(func (export "explicit-thread-yield-loops-sync") (result i32)
(call $thread-unsuspend
(call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
(i32.const 42))
(func (export "explicit-thread-yield-loops-stackful")
(call $thread-unsuspend
(call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
(call $task-return (i32.const 42)))
(func (export "explicit-thread-yield-loops-stackless") (result i32)
(call $thread-unsuspend
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
(call $task-return (i32.const 42))
(i32.const 0 (; EXIT ;)))
;; Initialize the function table that will be used by thread.new-indirect
(elem (table $indirect-function-table) (i32.const 0 (; call-return-ftbl-idx ;)) func $call-return)
(elem (table $indirect-function-table) (i32.const 1 (; suspend-ftbl-idx ;)) func $suspend)
(elem (table $indirect-function-table) (i32.const 2 (; yield-loop-ftbl-idx ;)) func $yield-loop)
)
;; Instantiate the libc module to get the table
(core instance $libc (instantiate $libc))
;; Get access to `thread.new-indirect` that uses the table from libc
(core type $start-func-ty (func (param i32)))
(alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
(core func $task-return (canon task.return (result u32)))
(core func $task-cancel (canon task.cancel))
(core func $thread-new-indirect
(canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
(core func $thread-yield (canon thread.yield))
(core func $thread-yield-cancellable (canon thread.yield cancellable))
(core func $thread-index (canon thread.index))
(core func $thread-yield-to-suspended (canon thread.yield-to-suspended))
(core func $thread-yield-to-suspended-cancellable (canon thread.yield-to-suspended cancellable))
(core func $thread-unsuspend (canon thread.unsuspend))
(core func $thread-suspend-to (canon thread.suspend-to))
(core func $thread-suspend-to-cancellable (canon thread.suspend-to cancellable))
(core func $thread-suspend (canon thread.suspend))
(core func $thread-suspend-cancellable (canon thread.suspend cancellable))
(core func $waitable-set.new (canon waitable-set.new))
(core func $waitable.join (canon waitable.join))
(core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
;; Instantiate the main module
(core instance $cm (
instantiate $CM
(with "" (instance
(export "mem" (memory $memory "mem"))
(export "task.return" (func $task-return))
(export "task.cancel" (func $task-cancel))
(export "thread.new-indirect" (func $thread-new-indirect))
(export "thread.index" (func $thread-index))
(export "thread.yield-to-suspended" (func $thread-yield-to-suspended))
(export "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable))
(export "thread.yield" (func $thread-yield))
(export "thread.yield-cancellable" (func $thread-yield-cancellable))
(export "thread.suspend-to" (func $thread-suspend-to))
(export "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable))
(export "thread.suspend" (func $thread-suspend))
(export "thread.suspend-cancellable" (func $thread-suspend-cancellable))
(export "thread.unsuspend" (func $thread-unsuspend))
(export "waitable.join" (func $waitable.join))
(export "waitable-set.wait" (func $waitable-set.wait))
(export "waitable-set.new" (func $waitable-set.new))))
(with "libc" (instance $libc))))
(func (export "explicit-thread-calls-return-stackful") async (result u32)
(canon lift (core func $cm "explicit-thread-calls-return-stackful") async))
(func (export "explicit-thread-calls-return-stackless") async (result u32)
(canon lift (core func $cm "explicit-thread-calls-return-stackless") async (callback (func $cm "cb"))))
(func (export "explicit-thread-suspends-sync") async (result u32)
(canon lift (core func $cm "explicit-thread-suspends-sync")))
(func (export "explicit-thread-suspends-stackful") async (result u32)
(canon lift (core func $cm "explicit-thread-suspends-stackful") async))
(func (export "explicit-thread-suspends-stackless") async (result u32)
(canon lift (core func $cm "explicit-thread-suspends-stackless") async (callback (func $cm "cb"))))
(func (export "explicit-thread-yield-loops-sync") async (result u32)
(canon lift (core func $cm "explicit-thread-yield-loops-sync")))
(func (export "explicit-thread-yield-loops-stackful") async (result u32)
(canon lift (core func $cm "explicit-thread-yield-loops-stackful") async))
(func (export "explicit-thread-yield-loops-stackless") async (result u32)
(canon lift (core func $cm "explicit-thread-yield-loops-stackless") async (callback (func $cm "cb"))))
)
(component $D
(import "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful async (result u32)))
(import "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless async (result u32)))
(import "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync async (result u32)))
(import "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful async (result u32)))
(import "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless async (result u32)))
(import "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync async (result u32)))
(import "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful async (result u32)))
(import "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless async (result u32)))
(core module $Memory (memory (export "mem") 1))
(core instance $memory (instantiate $Memory))
(core module $DM
(import "" "mem" (memory 1))
(import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
;; sync lowered
(import "" "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful (result i32)))
(import "" "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless (result i32)))
(import "" "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync (result i32)))
(import "" "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful (result i32)))
(import "" "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless (result i32)))
(import "" "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync (result i32)))
(import "" "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful (result i32)))
(import "" "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless (result i32)))
;; async lowered
(import "" "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async (param i32) (result i32)))
(import "" "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async (param i32) (result i32)))
(import "" "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async (param i32) (result i32)))
(import "" "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async (param i32) (result i32)))
(import "" "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async (param i32) (result i32)))
(import "" "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async (param i32) (result i32)))
(import "" "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async (param i32) (result i32)))
(import "" "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async (param i32) (result i32)))
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
(import "" "thread.yield" (func $thread-yield (result i32)))
(func $check (param i32)
(if (i32.ne (local.get 0) (i32.const 42))
(then unreachable))
)
(func $check-async (param i32)
(local $retp i32) (local $ws i32) (local $ws-retp i32)
(local.set $retp (i32.const 8))
(local.set $ws-retp (i32.const 16))
(local.set $ws (call $waitable-set.new))
(if (i32.eq (i32.and (local.get 0) (i32.const 0xF)) (i32.const 2 (; RETURNED ;)))
(then (call $check (i32.load (local.get $retp))))
(else
(call $waitable.join (i32.shr_u (local.get 0) (i32.const 4)) (local.get $ws))
(drop (call $waitable-set.wait (local.get $ws) (local.get $ws-retp)))
(call $check (i32.load (local.get $retp)))))
)
(func $run (export "run") (result i32)
(local $retp i32)
(local.set $retp (i32.const 8))
(call $check (call $explicit-thread-calls-return-stackless))
(call $check (call $explicit-thread-calls-return-stackful))
(call $check (call $explicit-thread-suspends-sync))
(call $check (call $explicit-thread-suspends-stackful))
(call $check (call $explicit-thread-suspends-stackless))
(call $check (call $explicit-thread-yield-loops-sync))
(call $check (call $explicit-thread-yield-loops-stackful))
(call $check (call $explicit-thread-yield-loops-stackless))
(call $check-async (call $explicit-thread-calls-return-stackless-async (local.get $retp)))
(call $check-async (call $explicit-thread-calls-return-stackful-async (local.get $retp)))
(call $check-async (call $explicit-thread-suspends-sync-async (local.get $retp)))
(call $check-async (call $explicit-thread-suspends-stackful-async (local.get $retp)))
(call $check-async (call $explicit-thread-suspends-stackless-async (local.get $retp)))
(call $check-async (call $explicit-thread-yield-loops-sync-async (local.get $retp)))
(call $check-async (call $explicit-thread-yield-loops-stackful-async (local.get $retp)))
(call $check-async (call $explicit-thread-yield-loops-stackless-async (local.get $retp)))
(i32.const 42)
)
)
(core func $waitable-set.new (canon waitable-set.new))
(core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
(core func $waitable.join (canon waitable.join))
(core func $subtask.cancel (canon subtask.cancel async))
(core func $thread.yield (canon thread.yield))
;; sync lowered
(canon lower (func $explicit-thread-calls-return-stackful) (memory $memory "mem") (core func $explicit-thread-calls-return-stackful'))
(canon lower (func $explicit-thread-calls-return-stackless) (memory $memory "mem") (core func $explicit-thread-calls-return-stackless'))
(canon lower (func $explicit-thread-suspends-sync) (memory $memory "mem") (core func $explicit-thread-suspends-sync'))
(canon lower (func $explicit-thread-suspends-stackful) (memory $memory "mem") (core func $explicit-thread-suspends-stackful'))
(canon lower (func $explicit-thread-suspends-stackless) (memory $memory "mem") (core func $explicit-thread-suspends-stackless'))
(canon lower (func $explicit-thread-yield-loops-sync) (memory $memory "mem") (core func $explicit-thread-yield-loops-sync'))
(canon lower (func $explicit-thread-yield-loops-stackful) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful'))
(canon lower (func $explicit-thread-yield-loops-stackless) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless'))
;; async lowered
(canon lower (func $explicit-thread-calls-return-stackful) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackful-async'))
(canon lower (func $explicit-thread-calls-return-stackless) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackless-async'))
(canon lower (func $explicit-thread-suspends-sync) async (memory $memory "mem") (core func $explicit-thread-suspends-sync-async'))
(canon lower (func $explicit-thread-suspends-stackful) async (memory $memory "mem") (core func $explicit-thread-suspends-stackful-async'))
(canon lower (func $explicit-thread-suspends-stackless) async (memory $memory "mem") (core func $explicit-thread-suspends-stackless-async'))
(canon lower (func $explicit-thread-yield-loops-sync) async (memory $memory "mem") (core func $explicit-thread-yield-loops-sync-async'))
(canon lower (func $explicit-thread-yield-loops-stackful) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful-async'))
(canon lower (func $explicit-thread-yield-loops-stackless) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless-async'))
(core instance $dm (instantiate $DM (with "" (instance
(export "mem" (memory $memory "mem"))
(export "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful'))
(export "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless'))
(export "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync'))
(export "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful'))
(export "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless'))
(export "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync'))
(export "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful'))
(export "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless'))
(export "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async'))
(export "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async'))
(export "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async'))
(export "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async'))
(export "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async'))
(export "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async'))
(export "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async'))
(export "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async'))
(export "waitable.join" (func $waitable.join))
(export "waitable-set.new" (func $waitable-set.new))
(export "waitable-set.wait" (func $waitable-set.wait))
(export "subtask.cancel" (func $subtask.cancel))
(export "thread.yield" (func $thread.yield))
))))
(func (export "run") async (result u32) (canon lift (core func $dm "run")))
)
(instance $c (instantiate $C))
(instance $d (instantiate $D
(with "explicit-thread-calls-return-stackful" (func $c "explicit-thread-calls-return-stackful"))
(with "explicit-thread-calls-return-stackless" (func $c "explicit-thread-calls-return-stackless"))
(with "explicit-thread-suspends-sync" (func $c "explicit-thread-suspends-sync"))
(with "explicit-thread-suspends-stackful" (func $c "explicit-thread-suspends-stackful"))
(with "explicit-thread-suspends-stackless" (func $c "explicit-thread-suspends-stackless"))
(with "explicit-thread-yield-loops-sync" (func $c "explicit-thread-yield-loops-sync"))
(with "explicit-thread-yield-loops-stackful" (func $c "explicit-thread-yield-loops-stackful"))
(with "explicit-thread-yield-loops-stackless" (func $c "explicit-thread-yield-loops-stackless"))
))
(func (export "run") (alias export $d "run"))
(func (export "explicit-thread-calls-return-stackful") (alias export $c "explicit-thread-calls-return-stackful"))
(func (export "explicit-thread-calls-return-stackless") (alias export $c "explicit-thread-calls-return-stackless"))
(func (export "explicit-thread-suspends-sync") (alias export $c "explicit-thread-suspends-sync"))
(func (export "explicit-thread-suspends-stackful") (alias export $c "explicit-thread-suspends-stackful"))
(func (export "explicit-thread-suspends-stackless") (alias export $c "explicit-thread-suspends-stackless"))
(func (export "explicit-thread-yield-loops-sync") (alias export $c "explicit-thread-yield-loops-sync"))
(func (export "explicit-thread-yield-loops-stackful") (alias export $c "explicit-thread-yield-loops-stackful"))
(func (export "explicit-thread-yield-loops-stackless") (alias export $c "explicit-thread-yield-loops-stackless"))
)
"#,
)?
.serialize()?;
let component = unsafe { Component::deserialize(&engine, &component)? };
let mut store = Store::new(&engine, ());
let instance = Linker::new(&engine)
.instantiate_async(&mut store, &component)
.await?;
let funcs = vec![
"run",
"explicit-thread-calls-return-stackful",
"explicit-thread-calls-return-stackless",
"explicit-thread-suspends-sync",
"explicit-thread-suspends-stackful",
"explicit-thread-suspends-stackless",
"explicit-thread-yield-loops-sync",
"explicit-thread-yield-loops-stackful",
"explicit-thread-yield-loops-stackless",
];
for func in funcs {
let func = instance.get_typed_func::<(), (u32,)>(&mut store, func)?;
assert_eq!(func.call_async(&mut store, ()).await?, (42,));
}
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn cancel_host_future() -> Result<()> {
let mut config = Config::new();
config.wasm_component_model_async(true);
let engine = Engine::new(&config)?;
let component = Component::new(
&engine,
r#"
(component
(core module $libc (memory (export "memory") 1))
(core instance $libc (instantiate $libc))
(core module $m
(import "" "future.read" (func $future.read (param i32 i32) (result i32)))
(import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32)))
(memory (export "memory") 1)
(func (export "run") (param i32)
;; read/cancel attempt 1
(call $future.read (local.get 0) (i32.const 100))
i32.const -1 ;; BLOCKED
i32.ne
if unreachable end
(call $future.cancel-read (local.get 0))
i32.const 2 ;; CANCELLED
i32.ne
if unreachable end
;; read/cancel attempt 2
(call $future.read (local.get 0) (i32.const 100))
i32.const -1 ;; BLOCKED
i32.ne
if unreachable end
(call $future.cancel-read (local.get 0))
i32.const 2 ;; CANCELLED
i32.ne
if unreachable end
)
)
(type $f (future u32))
(core func $future.read (canon future.read $f async (memory $libc "memory")))
(core func $future.cancel-read (canon future.cancel-read $f))
(core instance $i (instantiate $m
(with "" (instance
(export "future.read" (func $future.read))
(export "future.cancel-read" (func $future.cancel-read))
))
))
(func (export "run") async (param "f" $f)
(canon lift
(core func $i "run")
(memory $libc "memory")
)
)
)
"#,
)?;
let mut store = Store::new(&engine, ());
let instance = Linker::new(&engine)
.instantiate_async(&mut store, &component)
.await?;
let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?;
let reader = FutureReader::new(&mut store, MyFutureReader)?;
func.call_async(&mut store, (reader,)).await?;
return Ok(());
struct MyFutureReader;
impl FutureProducer<()> for MyFutureReader {
type Item = u32;
fn poll_produce(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_store: StoreContextMut<()>,
finish: bool,
) -> Poll<Result<Option<Self::Item>>> {
if finish {
Poll::Ready(Ok(None))
} else {
Poll::Pending
}
}
}
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn run_wasm_in_call_async() -> Result<()> {
_ = env_logger::try_init();
let mut config = Config::new();
config.wasm_component_model_async(true);
let engine = Engine::new(&config)?;
let a = Component::new(
&engine,
r#"
(component
(type $t (func async))
(import "a" (func $f (type $t)))
(core func $f (canon lower (func $f)))
(core module $a
(import "" "f" (func $f))
(func (export "run") call $f)
)
(core instance $a (instantiate $a
(with "" (instance (export "f" (func $f))))
))
(func (export "run") (type $t)
(canon lift (core func $a "run")))
)
"#,
)?;
let b = Component::new(
&engine,
r#"
(component
(type $t (func async))
(core module $a
(func (export "run"))
)
(core instance $a (instantiate $a))
(func (export "run") (type $t)
(canon lift (core func $a "run")))
)
"#,
)?;
type State = Option<Instance>;
let mut linker = Linker::new(&engine);
linker
.root()
.func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| {
Box::pin(async move {
let func = accessor.with(|mut access| {
access
.get()
.unwrap()
.get_typed_func::<(), ()>(&mut access, "run")
})?;
func.call_concurrent(accessor, ()).await?;
Ok(())
})
})?;
let mut store = Store::new(&engine, None);
let instance_a = linker.instantiate_async(&mut store, &a).await?;
let instance_b = linker.instantiate_async(&mut store, &b).await?;
*store.data_mut() = Some(instance_b);
let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?;
run.call_async(&mut store, ()).await?;
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn require_concurrency_support() -> Result<()> {
let mut config = Config::new();
config.concurrency_support(false);
let engine = Engine::new(&config)?;
let mut store = Store::new(&engine, ());
assert!(
store
.run_concurrent(async |_| wasmtime::error::Ok(()))
.await
.is_err()
);
assert!(StreamReader::<u32>::new(&mut store, Vec::new()).is_err());
assert!(FutureReader::new(&mut store, async { wasmtime::error::Ok(0) }).is_err());
let mut linker = Linker::<()>::new(&engine);
let mut root = linker.root();
assert!(
root.func_wrap_concurrent::<(), (), _>("f1", |_, _| { todo!() })
.is_err()
);
assert!(
root.func_new_concurrent("f2", |_, _, _, _| { todo!() })
.is_err()
);
assert!(
root.resource_concurrent("f3", ResourceType::host::<u32>(), |_, _| { todo!() })
.is_err()
);
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn cancel_host_task_does_not_leak() -> Result<()> {
let mut config = Config::new();
config.wasm_component_model_async(true);
let engine = Engine::new(&config)?;
let mut store = Store::new(&engine, ());
let component = Component::new(
&engine,
r#"(component
(import "f" (func $f async))
(core module $m
(import "" "f" (func $f (result i32)))
(import "" "cancel" (func $cancel (param i32) (result i32)))
(import "" "drop" (func $drop (param i32)))
(func (export "run")
(local i32)
;; start the subtask, asserting it's `STARTED`
call $f
local.tee 0
i32.const 0xf
i32.and
i32.const 1 ;; STARTED
i32.ne
if unreachable end
;; extract the task id
local.get 0
i32.const 4
i32.shr_u
local.set 0
;; cancel the subtask asserting it's `RETURN_CANCELLED`
local.get 0
call $cancel
i32.const 4 ;; RETURN_CANCELLED
i32.ne
if unreachable end
;; drop the subtask
local.get 0
call $drop
)
)
(core func $f (canon lower (func $f) async))
(core func $cancel (canon subtask.cancel))
(core func $drop (canon subtask.drop))
(core instance $i (instantiate $m
(with "" (instance
(export "f" (func $f))
(export "cancel" (func $cancel))
(export "drop" (func $drop))
))
))
(func (export "f") async
(canon lift (core func $i "run")))
)"#,
)?;
let mut linker = Linker::new(&engine);
linker.root().func_wrap_concurrent("f", |_, ()| {
Box::pin(async move {
std::future::pending::<()>().await;
Ok(())
})
})?;
let instance = linker.instantiate_async(&mut store, &component).await?;
let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
store
.run_concurrent(async |store| -> wasmtime::Result<()> {
func.call_concurrent(store, ()).await?;
for _ in 0..5 {
tokio::task::yield_now().await;
}
Ok(())
})
.await??;
store.assert_concurrent_state_empty();
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn sync_lower_async_host_does_not_leak() -> Result<()> {
let mut config = Config::new();
config.wasm_component_model_async(true);
let engine = Engine::new(&config)?;
let mut store = Store::new(&engine, 0);
let component = Component::new(
&engine,
r#"(component
(import "f" (func $f async))
(core module $m
(import "" "f" (func $f))
(func (export "run")
(local $c i32)
;; call the host 100 times
loop
call $f
(local.tee $c (i32.add (local.get $c) (i32.const 1)))
i32.const 100
i32.ne
if br 1 end
end
)
)
(core func $f (canon lower (func $f) ))
(core instance $i (instantiate $m
(with "" (instance
(export "f" (func $f))
))
))
(func (export "f") async
(canon lift (core func $i "run")))
)"#,
)?;
let mut linker = Linker::<usize>::new(&engine);
linker
.root()
.func_wrap_concurrent("f", |accessor, (): ()| {
Box::pin(async move {
for _ in 0..5 {
tokio::task::yield_now().await;
}
accessor.with(|mut s| {
let cur = s.as_context_mut().concurrent_state_table_size();
let max = s.data_mut();
*max = (*max).max(cur);
});
Ok(())
})
})?;
let instance = linker.instantiate_async(&mut store, &component).await?;
let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
func.call_async(&mut store, ()).await?;
store.assert_concurrent_state_empty();
assert!(
*store.data() < 100,
"the store peaked at over 100 items in the concurrent table which \
indicates that something isn't getting cleaned up between executions \
of the host"
);
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn stream_cancel_read_async_does_not_corrupt_state() -> Result<()> {
_ = env_logger::try_init();
let mut config = Config::new();
config.wasm_component_model_async(true);
config.wasm_component_model_async_builtins(true);
config.wasm_component_model_async_stackful(true);
let engine = Engine::new(&config)?;
let component = Component::new(
&engine,
r#"
(component
(core module $libc (memory (export "memory") 1))
(core instance $libc (instantiate $libc))
(core module $m
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
(import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
(memory (export "memory") 1)
(func (export "run") (param $sr i32)
(local $cancel_result i32)
(local $ws i32)
;; Async read into buffer at 0x100, length 4.
;; Should return BLOCKED (-1) since the host producer never writes.
(call $stream.read (local.get $sr) (i32.const 0x100) (i32.const 4))
i32.const -1 ;; BLOCKED
i32.ne
if unreachable end
;; Async cancel-read. The host write end is HostReady, so this returns
;; BLOCKED. Bug: the cancel unconditionally transitions GuestReady -> Open,
;; destroying the buffer info.
(local.set $cancel_result (call $stream.cancel-read (local.get $sr)))
;; If cancel returned BLOCKED (-1), wait for the cancel to complete.
;; This is where the bug manifests: when the host processes the cancel,
;; it accesses the read state which was corrupted from GuestReady to Open.
(if (i32.eq (local.get $cancel_result) (i32.const -1))
(then
(local.set $ws (call $waitable-set.new))
(call $waitable.join (local.get $sr) (local.get $ws))
;; Wait for the stream event (cancel completion). Event buffer at 0x200.
(drop (call $waitable-set.wait (local.get $ws) (i32.const 0x200)))
;; Unjoin stream from waitable-set (join to 0 = unjoin)
(call $waitable.join (local.get $sr) (i32.const 0))
(call $waitable-set.drop (local.get $ws))
)
)
;; Drop the stream
(call $stream.drop-readable (local.get $sr))
)
)
(type $s (stream u8))
(core func $stream.read (canon stream.read $s async (memory $libc "memory")))
(core func $stream.cancel-read (canon stream.cancel-read $s async))
(core func $stream.drop-readable (canon stream.drop-readable $s))
(canon waitable.join (core func $waitable.join))
(canon waitable-set.new (core func $waitable-set.new))
(canon waitable-set.wait (memory $libc "memory") (core func $waitable-set.wait))
(canon waitable-set.drop (core func $waitable-set.drop))
(core instance $i (instantiate $m
(with "" (instance
(export "stream.read" (func $stream.read))
(export "stream.cancel-read" (func $stream.cancel-read))
(export "stream.drop-readable" (func $stream.drop-readable))
(export "waitable.join" (func $waitable.join))
(export "waitable-set.new" (func $waitable-set.new))
(export "waitable-set.wait" (func $waitable-set.wait))
(export "waitable-set.drop" (func $waitable-set.drop))
))
))
(func (export "run") async (param "s" (stream u8))
(canon lift
(core func $i "run")
(memory $libc "memory")
)
)
)
"#,
)?;
let mut store = Store::new(&engine, ());
let instance = Linker::new(&engine)
.instantiate_async(&mut store, &component)
.await?;
let func = instance.get_typed_func::<(StreamReader<u8>,), ()>(&mut store, "run")?;
let reader = StreamReader::new(&mut store, NeverWriteStreamProducer)?;
func.call_async(&mut store, (reader,)).await?;
return Ok(());
struct NeverWriteStreamProducer;
impl StreamProducer<()> for NeverWriteStreamProducer {
type Item = u8;
type Buffer = Option<u8>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_store: StoreContextMut<'a, ()>,
_destination: Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<Result<StreamResult>> {
if finish {
Poll::Ready(Ok(StreamResult::Cancelled))
} else {
Poll::Pending
}
}
}
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn concurrent_sync_calls_to_async_host() -> Result<()> {
_ = env_logger::try_init();
let mut config = Config::new();
config.wasm_component_model_async(true);
config.wasm_component_model_async_builtins(true);
config.wasm_component_model_async_stackful(true);
config.wasm_component_model_threading(true);
let engine = Engine::new(&config)?;
let mut store = Store::new(&engine, 0);
let component = Component::new(
&engine,
r#"(component
(import "await-three-calls" (func $await-three-calls async))
(core module $libc
(table (export "__indirect_function_table") 1 funcref))
(core module $m
(import "" "await-three-calls" (func $await-three-calls))
(import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
(import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
(import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref))
(func (export "run")
(call $thread-new-indirect (i32.const 0) (i32.const 0))
(call $thread-unsuspend)
(call $thread-new-indirect (i32.const 0) (i32.const 0))
(call $thread-unsuspend)
(call $await-three-calls)
)
(func $thread-entry (param i32)
(call $await-three-calls)
)
(elem (table $indirect-function-table) (i32.const 0) func $thread-entry)
)
;; Instantiate the libc module to get the table
(core instance $libc (instantiate $libc))
;; Get access to `thread.new-indirect` that uses the table from libc
(core type $start-func-ty (func (param i32)))
(alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
(core func $thread-new-indirect
(canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
(core func $thread-unsuspend (canon thread.unsuspend))
(core func $await-three-calls (canon lower (func $await-three-calls) ))
(core instance $i (instantiate $m
(with "" (instance
(export "await-three-calls" (func $await-three-calls))
(export "thread.new-indirect" (func $thread-new-indirect))
(export "thread.unsuspend" (func $thread-unsuspend))
))
(with "libc" (instance $libc))
))
(func (export "run") async
(canon lift (core func $i "run")))
)"#,
)?;
let mut linker = Linker::<i32>::new(&engine);
linker
.root()
.func_wrap_concurrent("await-three-calls", |accessor, (): ()| {
Box::pin(async move {
accessor.with(|mut s| {
*s.data_mut() += 1;
});
while accessor.with(|mut s| *s.data_mut()) < 3 {
tokio::task::yield_now().await;
}
Ok(())
})
})?;
let instance = linker.instantiate_async(&mut store, &component).await?;
let func = instance.get_typed_func::<(), ()>(&mut store, "run")?;
func.call_async(&mut store, ()).await?;
store.assert_concurrent_state_empty();
Ok(())
}
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn bytes_stream_producer() -> Result<()> {
let mut config = Config::new();
config.wasm_component_model_async(true);
let engine = Engine::new(&config)?;
let component = Component::new(
&engine,
r#"
(component
(core module $libc (memory (export "mem") 1))
(core instance $libc (instantiate $libc))
(core module $m
(import "" "mem" (memory 1))
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
(func (export "read") (param i32 i32) (result i32)
(call $stream.read (local.get 0) (i32.const 0) (local.get 1))
)
)
(type $s (stream u8))
(core func $stream.read (canon stream.read $s async (memory $libc "mem")))
(core instance $i (instantiate $m
(with "" (instance
(export "mem" (memory $libc "mem"))
(export "stream.read" (func $stream.read))
))
))
(func (export "read") (param "s" (stream u8)) (param "l" u32) (result u32)
(canon lift (core func $i "read")))
)
"#,
)?;
let linker = Linker::new(&engine);
let mut store = Store::new(&engine, ());
let instance = linker.instantiate_async(&mut store, &component).await?;
let func = instance.get_typed_func::<(StreamReader<u8>, u32), (u32,)>(&mut store, "read")?;
let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
assert_eq!(
func.call_async(&mut store, (reader, 1)).await?,
((1 << 4) | 0,),
);
let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
assert_eq!(
func.call_async(&mut store, (reader, 100)).await?,
((5 << 4) | 1,),
);
Ok(())
}