#[cfg(feature = "sys-thread")]
pub mod tokio;
use std::ops::Deref;
use std::task::{Context, Poll};
use std::{pin::Pin, time::Duration};
use bytes::Bytes;
use derive_more::Debug;
use futures::future::BoxFuture;
use futures::{Future, TryFutureExt};
use wasmer::{AsStoreMut, Memory, MemoryType, Module, SharedMemory, Store, StoreMut};
use wasmer_wasix_types::wasi::{Errno, ExitCode};
use crate::os::task::thread::WasiThreadError;
use crate::{StoreSnapshot, WasiEnv, WasiFunctionEnv, WasiThread, capture_store_snapshot};
use crate::{state::PreparedInstanceGroupData, syscalls::AsyncifyFuture};
pub use virtual_mio::waker::*;
#[derive(Debug)]
pub enum SpawnType {
CreateMemory,
CreateMemoryOfType(MemoryType),
AttachMemory(SharedMemory),
#[debug("NewLinkerInstanceGroup(..)")]
NewLinkerInstanceGroup(PreparedInstanceGroupData),
}
pub enum SpawnMemoryTypeOrStore {
New,
Type(wasmer::MemoryType),
StoreAndMemory(wasmer::Store, Memory),
}
pub type WasmResumeTask = dyn FnOnce(WasiFunctionEnv, Store, Bytes) + Send + 'static;
pub type WasmResumeTrigger = dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<Bytes, ExitCode>> + Send + 'static>>
+ Send
+ Sync;
#[derive(derive_more::Debug)]
pub struct TaskWasmRunProperties {
pub ctx: WasiFunctionEnv,
pub store: Store,
pub trigger_result: Option<Result<Bytes, ExitCode>>,
#[debug(ignore)]
pub recycle: Option<Box<TaskWasmRecycle>>,
}
pub type TaskWasmPreRun = dyn (for<'a> FnOnce(
&'a mut WasiFunctionEnv,
&'a mut Store,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
+ Send;
pub type TaskWasmRun = dyn FnOnce(TaskWasmRunProperties) + Send + 'static;
pub type TaskExecModule = dyn FnOnce(Module) + Send + 'static;
#[derive(Debug)]
pub struct TaskWasmRecycleProperties {
pub env: WasiEnv,
pub memory: Memory,
pub store: Store,
}
pub type TaskWasmRecycle = dyn FnOnce(TaskWasmRecycleProperties) + Send + 'static;
pub struct TaskWasmCallbacks {
pub run: Box<TaskWasmRun>,
pub recycle: Option<Box<TaskWasmRecycle>>,
pub pre_run: Option<Box<TaskWasmPreRun>>,
pub trigger: Option<Box<WasmResumeTrigger>>,
}
pub struct TaskWasm {
pub callbacks: TaskWasmCallbacks,
pub env: WasiEnv,
pub module: Module,
pub globals: Option<StoreSnapshot>,
pub spawn_type: SpawnType,
pub update_layout: bool,
pub call_initialize: bool,
}
impl TaskWasm {
pub fn new(
run: Box<TaskWasmRun>,
env: WasiEnv,
module: Module,
update_layout: bool,
call_initialize: bool,
) -> Self {
let shared_memory = module.imports().memories().next().map(|a| *a.ty());
Self {
callbacks: TaskWasmCallbacks {
run,
recycle: None,
pre_run: None,
trigger: None,
},
env,
module,
globals: None,
spawn_type: match shared_memory {
Some(ty) => SpawnType::CreateMemoryOfType(ty),
None => SpawnType::CreateMemory,
},
update_layout,
call_initialize,
}
}
pub fn with_memory(mut self, spawn_type: SpawnType) -> Self {
self.spawn_type = spawn_type;
self
}
pub fn with_optional_memory(mut self, spawn_type: Option<SpawnType>) -> Self {
if let Some(spawn_type) = spawn_type {
self.spawn_type = spawn_type;
}
self
}
pub fn with_globals(mut self, snapshot: StoreSnapshot) -> Self {
self.globals.replace(snapshot);
self
}
pub fn with_trigger(mut self, trigger: Box<WasmResumeTrigger>) -> Self {
self.callbacks.trigger.replace(trigger);
self
}
pub fn with_recycle(mut self, recycle: Box<TaskWasmRecycle>) -> Self {
self.callbacks.recycle.replace(recycle);
self
}
pub fn with_pre_run(mut self, pre_run: Box<TaskWasmPreRun>) -> Self {
self.callbacks.pre_run.replace(pre_run);
self
}
}
#[allow(unused_variables)]
pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
fn build_memory(
&self,
mut store: &mut StoreMut,
spawn_type: SpawnType,
) -> Result<Option<Memory>, WasiThreadError> {
match spawn_type {
SpawnType::CreateMemoryOfType(mut ty) => {
ty.shared = true;
let _ = ty.maximum.get_or_insert(wasmer_types::Pages::max_value());
let mem = Memory::new(&mut store, ty).map_err(|err| {
tracing::error!(
error = &err as &dyn std::error::Error,
memory_type=?ty,
"could not create memory",
);
WasiThreadError::MemoryCreateFailed(err)
})?;
Ok(Some(mem))
}
SpawnType::AttachMemory(mem) => Ok(Some(mem.attach(store))),
SpawnType::CreateMemory | SpawnType::NewLinkerInstanceGroup(..) => Ok(None),
}
}
fn sleep_now(
&self,
time: Duration,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
fn task_shared(
&self,
task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
) -> Result<(), WasiThreadError>;
fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError>;
fn task_dedicated(
&self,
task: Box<dyn FnOnce() + Send + 'static>,
) -> Result<(), WasiThreadError>;
fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
fn spawn_with_module(
&self,
module: Module,
task: Box<dyn FnOnce(Module) + Send + 'static>,
) -> Result<(), WasiThreadError> {
self.task_dedicated(Box::new(move || task(module)))
}
}
impl<D, T> VirtualTaskManager for D
where
D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
T: VirtualTaskManager + ?Sized,
{
fn build_memory(
&self,
store: &mut StoreMut,
spawn_type: SpawnType,
) -> Result<Option<Memory>, WasiThreadError> {
(**self).build_memory(store, spawn_type)
}
fn sleep_now(
&self,
time: Duration,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>> {
(**self).sleep_now(time)
}
fn task_shared(
&self,
task: Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + 'static>,
) -> Result<(), WasiThreadError> {
(**self).task_shared(task)
}
fn task_wasm(&self, task: TaskWasm) -> Result<(), WasiThreadError> {
(**self).task_wasm(task)
}
fn task_dedicated(
&self,
task: Box<dyn FnOnce() + Send + 'static>,
) -> Result<(), WasiThreadError> {
(**self).task_dedicated(task)
}
fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
(**self).thread_parallelism()
}
fn spawn_with_module(
&self,
module: Module,
task: Box<dyn FnOnce(Module) + Send + 'static>,
) -> Result<(), WasiThreadError> {
(**self).spawn_with_module(module, task)
}
}
impl dyn VirtualTaskManager {
#[doc(hidden)]
pub unsafe fn resume_wasm_after_poller(
&self,
task: Box<WasmResumeTask>,
ctx: WasiFunctionEnv,
mut store: Store,
trigger: Pin<Box<AsyncifyFuture>>,
) -> Result<(), WasiThreadError> {
struct AsyncifyPollerOwned {
thread: WasiThread,
trigger: Pin<Box<AsyncifyFuture>>,
}
impl Future for AsyncifyPollerOwned {
type Output = Result<Bytes, ExitCode>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let work = self.trigger.as_mut();
Poll::Ready(if let Poll::Ready(res) = work.poll(cx) {
Ok(res)
} else if let Some(forced_exit) = self.thread.try_join() {
return Poll::Ready(Err(forced_exit.unwrap_or_else(|err| {
tracing::debug!("exit runtime error - {}", err);
Errno::Child.into()
})));
} else {
return Poll::Pending;
})
}
}
let snapshot = capture_store_snapshot(&mut store.as_store_mut());
let env = ctx.data(&store);
let env_inner = env.inner();
let handles = env_inner
.static_module_instance_handles()
.ok_or(WasiThreadError::Unsupported)?;
let module = handles.module_clone();
let memory = handles.memory_clone();
let thread = env.thread.clone();
let env = env.clone();
let thread_inner = thread.clone();
self.task_wasm(
TaskWasm::new(
Box::new(move |props| {
let result = props
.trigger_result
.expect("If there is no result then its likely the trigger did not run");
let result = match result {
Ok(r) => r,
Err(exit_code) => {
thread.set_status_finished(Ok(exit_code));
return;
}
};
task(props.ctx, props.store, result)
}),
env.clone(),
module,
false,
false,
)
.with_memory(SpawnType::AttachMemory(
memory.as_shared(&store).ok_or_else(|| {
tracing::error!("failed to get shared memory for asyncify poller");
WasiThreadError::Unsupported
})?,
))
.with_globals(snapshot)
.with_trigger(Box::new(move || {
Box::pin(async move {
let mut poller = AsyncifyPollerOwned {
thread: thread_inner,
trigger,
};
let res = Pin::new(&mut poller).await;
let res = match res {
Ok(res) => res,
Err(exit_code) => {
env.thread.set_status_finished(Ok(exit_code));
return Err(exit_code);
}
};
tracing::trace!("deep sleep woken - res.len={}", res.len());
Ok(res)
})
})),
)
}
}
pub trait VirtualTaskManagerExt {
fn spawn_and_block_on<A>(
&self,
task: impl Future<Output = A> + Send + 'static,
) -> Result<A, anyhow::Error>
where
A: Send + 'static;
fn spawn_await<O, F>(
&self,
f: F,
) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
where
O: Send + 'static,
F: FnOnce() -> O + Send + 'static;
}
impl<D, T> VirtualTaskManagerExt for D
where
D: Deref<Target = T>,
T: VirtualTaskManager + ?Sized,
{
fn spawn_and_block_on<A>(
&self,
task: impl Future<Output = A> + Send + 'static,
) -> Result<A, anyhow::Error>
where
A: Send + 'static,
{
let (tx, rx) = ::tokio::sync::oneshot::channel();
let work = Box::pin(async move {
let ret = task.await;
tx.send(ret).ok();
});
self.task_shared(Box::new(move || work)).unwrap();
rx.blocking_recv()
.map_err(|_| anyhow::anyhow!("task execution failed - result channel dropped"))
}
fn spawn_await<O, F>(
&self,
f: F,
) -> Box<dyn Future<Output = Result<O, Box<dyn std::error::Error>>> + Unpin + Send + 'static>
where
O: Send + 'static,
F: FnOnce() -> O + Send + 'static,
{
let (sender, receiver) = ::tokio::sync::oneshot::channel();
self.task_dedicated(Box::new(move || {
let result = f();
let _ = sender.send(result);
}))
.unwrap();
Box::new(receiver.map_err(|e| Box::new(e).into()))
}
}