#![cfg(unix)]
use std::{
cell::UnsafeCell,
ffi::CStr,
sync::{
Arc, LazyLock,
atomic::{AtomicUsize, Ordering},
},
};
use dashmap::{DashMap, Entry};
use wasmer_types::StoreId;
use super::*;
struct StoreInterruptState {
pthread: usize,
interrupted: bool,
thread_current_signal_target_store: Arc<AtomicUsize>,
}
struct ThreadInterruptState {
active_stores: Vec<StoreId>,
current_active_store: AtomicUsize,
current_signal_target_store: Arc<AtomicUsize>,
}
static STORE_INTERRUPT_STATE: LazyLock<DashMap<StoreId, StoreInterruptState>> =
LazyLock::new(Default::default);
thread_local! {
static THREAD_INTERRUPT_STATE: UnsafeCell<ThreadInterruptState> =
UnsafeCell::new(ThreadInterruptState {
active_stores: vec![],
current_active_store: AtomicUsize::new(0),
current_signal_target_store: Arc::new(AtomicUsize::new(0)),
});
}
pub fn install(store_id: StoreId) -> Result<InterruptInstallGuard, InstallError> {
let store_state = STORE_INTERRUPT_STATE.entry(store_id).or_insert_with(|| {
let thread_current_signal_target_store = THREAD_INTERRUPT_STATE.with(|t| {
unsafe { t.get().as_mut().unwrap() }
.current_signal_target_store
.clone()
});
#[allow(trivial_numeric_casts)]
let pthread = unsafe { libc::pthread_self() as usize };
StoreInterruptState {
pthread,
interrupted: false,
thread_current_signal_target_store,
}
});
if store_state.interrupted {
return Err(InstallError::AlreadyInterrupted);
}
THREAD_INTERRUPT_STATE.with(|t| {
let borrow = unsafe { t.get().as_mut().unwrap() };
borrow.active_stores.push(store_id);
borrow
.current_active_store
.store(store_id.as_raw().get(), Ordering::Release);
});
Ok(InterruptInstallGuard { store_id })
}
pub(super) fn uninstall(store_id: StoreId) {
let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
panic!("Internal error: interrupt state not installed for store");
};
let has_more_installations = THREAD_INTERRUPT_STATE.with(|t| {
let borrow = unsafe { t.get().as_mut().unwrap() };
match borrow.active_stores.pop_if(|x| *x == store_id) {
Some(_) => {
borrow.current_active_store.store(
borrow
.active_stores
.last()
.map(|x| x.as_raw().get())
.unwrap_or(0),
Ordering::Release,
);
borrow.active_stores.contains(&store_id)
}
None => panic!("InterruptInstallGuard dropped out of order"),
}
});
if !has_more_installations {
store_state_entry.remove();
}
}
pub fn interrupt(store_id: StoreId) -> Result<(), InterruptError> {
let Entry::Occupied(mut store_state) = STORE_INTERRUPT_STATE.entry(store_id) else {
return Err(InterruptError::StoreNotRunning);
};
let store_state = store_state.get_mut();
if let Err(_) = store_state
.thread_current_signal_target_store
.compare_exchange(
0,
store_id.as_raw().get(),
Ordering::SeqCst,
Ordering::SeqCst,
)
{
return Err(InterruptError::OtherInterruptInProgress);
}
store_state.interrupted = true;
unsafe {
#[allow(trivial_numeric_casts)]
let errno = libc::pthread_kill(store_state.pthread as libc::pthread_t, libc::SIGUSR1);
if errno != 0 {
let error_str = CStr::from_ptr(libc::strerror(errno)).to_str().unwrap();
return Err(InterruptError::FailedToSendSignal(error_str));
}
}
Ok(())
}
pub(crate) fn on_interrupted() -> bool {
THREAD_INTERRUPT_STATE.with(|t| {
let state = unsafe { t.get().as_ref().unwrap() };
let current_active_store = state.current_active_store.load(Ordering::Acquire);
let current_signal_target_store = state.current_signal_target_store.load(Ordering::Acquire);
assert_ne!(
current_signal_target_store, 0,
"current_signal_target_store should be set before signalling the WASM thread"
);
if let Err(_) = state.current_signal_target_store.compare_exchange(
current_signal_target_store,
0,
Ordering::SeqCst,
Ordering::SeqCst,
) {
unreachable!("current_signal_target_store isn't changed unless it's zero");
}
current_active_store == current_signal_target_store
})
}
pub fn is_interrupted(store_id: StoreId) -> bool {
let Entry::Occupied(store_state_entry) = STORE_INTERRUPT_STATE.entry(store_id) else {
return false;
};
store_state_entry.get().interrupted
}