use super::*;
#[cfg(feature = "journal")]
use crate::journal::JournalEffector;
use crate::{
WasiThreadHandle,
os::task::thread::WasiMemoryLayout,
runtime::{
TaintReason,
task_manager::{TaskWasm, TaskWasmRunProperties},
},
state::context_switching::ContextSwitchingEnvironment,
syscalls::*,
};
use wasmer::Memory;
use wasmer_wasix_types::wasi::ThreadStart;
#[instrument(level = "trace", skip_all, ret)]
pub fn thread_spawn_v2<M: MemorySize>(
mut ctx: FunctionEnvMut<'_, WasiEnv>,
start_ptr: WasmPtr<ThreadStart<M>, M>,
ret_tid: WasmPtr<Tid, M>,
) -> Result<Errno, WasiError> {
WasiEnv::do_pending_operations(&mut ctx)?;
let tid = wasi_try_ok!(thread_spawn_internal_from_wasi(&mut ctx, start_ptr));
let memory = unsafe { ctx.data().memory_view(&ctx) };
wasi_try_mem_ok!(ret_tid.write(&memory, tid));
tracing::debug!(
tid,
from_tid = ctx.data().thread.id().raw(),
"spawned new thread"
);
Ok(Errno::Success)
}
pub fn thread_spawn_internal_from_wasi<M: MemorySize>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
start_ptr: WasmPtr<ThreadStart<M>, M>,
) -> Result<Tid, Errno> {
let env = ctx.data();
let memory = unsafe { env.memory_view(&ctx) };
let runtime = env.runtime.clone();
let tasks = env.tasks().clone();
let start_ptr_offset = start_ptr.offset();
let layout = {
let start: ThreadStart<M> = start_ptr.read(&memory).map_err(mem_error_to_wasi)?;
let stack_upper: u64 = start.stack_upper.into();
let stack_size: u64 = start.stack_size.into();
let guard_size: u64 = start.guard_size.into();
let tls_base: u64 = start.tls_base.into();
let stack_lower = stack_upper - stack_size;
WasiMemoryLayout {
stack_upper,
stack_lower,
guard_size,
stack_size,
tls_base: Some(tls_base),
}
};
tracing::trace!(
from_tid = env.thread.id().raw(),
"thread_spawn with layout {:?}",
layout
);
let thread_start = ThreadStartType::ThreadSpawn {
start_ptr: start_ptr_offset.into(),
};
let mut thread_handle = match env.process.new_thread(layout.clone(), thread_start) {
Ok(h) => Arc::new(h),
Err(err) => {
error!(
stack_base = layout.stack_lower,
"failed to create thread handle",
);
return Err(Errno::Access);
}
};
let thread_id: Tid = thread_handle.id().into();
Span::current().record("tid", thread_id);
thread_spawn_internal_using_layout::<M>(ctx, thread_handle, layout, start_ptr_offset, None)?;
Ok(thread_id)
}
pub fn thread_spawn_internal_using_layout<M: MemorySize>(
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
thread_handle: Arc<WasiThreadHandle>,
layout: WasiMemoryLayout,
start_ptr_offset: M::Offset,
rewind_state: Option<(RewindState, RewindResultType)>,
) -> Result<(), Errno> {
let func_env = ctx.as_ref();
let mut store = ctx.as_store_mut();
let env = func_env.as_ref(&store);
let tasks = env.tasks().clone();
let env_inner = env.inner();
let module_handles = env_inner.main_module_instance_handles();
let thread_memory = module_handles.memory_clone();
let linker = env_inner.linker().cloned();
let state = env.state.clone();
let mut thread_env = env.clone();
thread_env.thread = thread_handle.as_thread();
thread_env.layout = layout;
thread_env.enable_deep_sleep = if cfg!(feature = "js") {
false
} else {
unsafe { env.capable_of_deep_sleep() }
};
let mut execute_module = {
let thread_handle = thread_handle;
move |ctx: WasiFunctionEnv, mut store: Store| {
call_module::<M>(ctx, store, start_ptr_offset, thread_handle, rewind_state)
}
};
if module_handles.thread_spawn.is_none() {
warn!("thread failed - the program does not export a `wasi_thread_start` function");
return Err(Errno::Notcapable);
}
let thread_module = module_handles.module_clone();
let spawn_type = match linker {
Some(linker) => {
let instance_group_data = linker.prepare_for_instance_group(ctx).map_err(|e| {
tracing::warn!("failed to prepare linker for thread spawn: {e}");
Errno::Notcapable
})?;
crate::runtime::SpawnType::NewLinkerInstanceGroup(instance_group_data)
}
None => crate::runtime::SpawnType::AttachMemory(
thread_memory.as_shared(&store).ok_or_else(|| {
tracing::warn!("Memory must be shared for thread spawning to work");
Errno::Memviolation
})?,
),
};
trace!("threading: spawning background thread");
let run = move |props: TaskWasmRunProperties| {
execute_module(props.ctx, props.store);
};
let mut task_wasm = TaskWasm::new(Box::new(run), thread_env, thread_module, false, false)
.with_memory(spawn_type);
tasks.task_wasm(task_wasm).map_err(Into::<Errno>::into)?;
Ok(())
}
fn call_module_internal<M: MemorySize>(
ctx: &WasiFunctionEnv,
mut store: Store,
start_ptr_offset: M::Offset,
) -> (Store, Result<Option<ExitCode>, DeepSleepWork>) {
let spawn = ctx
.data(&store)
.inner()
.main_module_instance_handles()
.thread_spawn
.clone()
.unwrap();
let tid = ctx.data(&store).tid();
let spawn: Function = spawn.into();
let tid_i32 = tid.raw().try_into().map_err(|_| Errno::Overflow).unwrap();
let start_pointer_i32 = start_ptr_offset
.try_into()
.map_err(|_| Errno::Overflow)
.unwrap();
let (mut store, thread_result) = ContextSwitchingEnvironment::run_main_context(
ctx,
store,
spawn,
vec![Value::I32(tid_i32), Value::I32(start_pointer_i32)],
);
let thread_result = thread_result.map(|_| ());
trace!("callback finished (ret={:?})", thread_result);
let exit_code = match handle_thread_result(ctx, &mut store, thread_result) {
Ok(code) => code,
Err(deep_sleep) => return (store, Err(deep_sleep)),
};
(store, Ok(exit_code))
}
fn handle_thread_result(
env: &WasiFunctionEnv,
store: &mut Store,
err: Result<(), RuntimeError>,
) -> Result<Option<ExitCode>, DeepSleepWork> {
let tid = env.data(&store).tid();
let pid = env.data(&store).pid();
let Err(err) = err else {
trace!("thread exited cleanly without calling thread_exit");
return Ok(None);
};
match err.downcast::<WasiError>() {
Ok(WasiError::ThreadExit) => {
trace!("thread exited cleanly");
Ok(None)
}
Ok(WasiError::Exit(code)) => {
trace!(exit_code = ?code, "thread requested exit");
if !code.is_success() {
env.data(&store)
.runtime
.on_taint(TaintReason::NonZeroExitCode(code));
};
Ok(Some(code))
}
Ok(WasiError::DeepSleep(deep)) => {
trace!("entered a deep sleep");
Err(deep)
}
Ok(WasiError::UnknownWasiVersion) => {
eprintln!(
"Thread {tid} of process {pid} failed because it has an unknown wasix version"
);
env.data(&store)
.runtime
.on_taint(TaintReason::UnknownWasiVersion);
Ok(Some(ExitCode::from(129)))
}
Ok(WasiError::DlSymbolResolutionFailed(symbol)) => {
eprintln!("Thread {tid} of process {pid} failed to find required symbol: {symbol}");
env.data(&store)
.runtime
.on_taint(TaintReason::DlSymbolResolutionFailed(symbol.clone()));
Ok(Some(ExitCode::from(129)))
}
Err(err) => {
if err.clone().to_trap() == Some(wasmer_types::TrapCode::HostInterrupt) {
debug!(%tid, %pid, error = %err, "thread interrupted by host");
} else {
eprintln!("Thread {tid} of process {pid} failed with runtime error: {err}");
}
env.data(&store)
.runtime
.on_taint(TaintReason::RuntimeError(err));
Ok(Some(ExitCode::from(129)))
}
}
}
fn call_module<M: MemorySize>(
mut ctx: WasiFunctionEnv,
mut store: Store,
start_ptr_offset: M::Offset,
thread_handle: Arc<WasiThreadHandle>,
rewind_state: Option<(RewindState, RewindResultType)>,
) {
let env = ctx.data(&store);
let tasks = env.tasks().clone();
if let Some((rewind_state, rewind_result)) = rewind_state {
let mut ctx = ctx.env.clone().into_mut(&mut store);
let res = rewind_ext::<M>(
&mut ctx,
Some(rewind_state.memory_stack),
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
return;
}
}
let (mut store, ret) = call_module_internal::<M>(&ctx, store, start_ptr_offset);
if let Err(deep) = ret {
let rewind = deep.rewind;
let respawn = {
let tasks = tasks.clone();
move |ctx, store, trigger_res| {
call_module::<M>(
ctx,
store,
start_ptr_offset,
thread_handle,
Some((rewind, RewindResultType::RewindWithResult(trigger_res))),
);
}
};
unsafe {
tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
};
return;
};
let exit_code = ret.unwrap_or_else(|_| unreachable!());
if let Some(exit_code) = exit_code {
ctx.on_exit(&mut store, Some(exit_code));
thread_handle.set_status_finished(Ok(exit_code));
} else {
ctx.on_exit(&mut store, None);
thread_handle.set_status_finished(Ok(Errno::Success.into()));
}
drop(thread_handle);
}