use std::{
sync::Arc,
thread::{self, Thread, ThreadId},
};
use derive_more::Constructor;
use fxhash::FxHashMap;
use parking_lot::Mutex;
use thread_local::ThreadLocal;
use elfo_utils::ward;
use crate::{actor::ActorMeta, scope};
#[derive(Default, Clone)]
pub struct StuckDetector(Arc<Inner>);
#[derive(Constructor)]
pub struct StuckActorInfo {
meta: Arc<ActorMeta>,
thread: Thread,
}
impl StuckActorInfo {
pub fn meta(&self) -> &Arc<ActorMeta> {
&self.meta
}
pub fn thread(&self) -> &Thread {
&self.thread
}
}
type Epoch = u32;
#[derive(Default)]
struct Inner {
tls: ThreadLocal<Mutex<PerThread>>,
last_check: Mutex<FxHashMap<ThreadId, (Arc<ActorMeta>, Epoch)>>,
}
struct PerThread {
thread: Thread,
meta: Option<Arc<ActorMeta>>,
epoch: Epoch,
}
impl PerThread {
fn current() -> Self {
Self {
thread: thread::current(),
meta: None,
epoch: 0,
}
}
}
impl StuckDetector {
pub(crate) fn enter(&self) {
let slot = self.0.tls.get_or(|| Mutex::new(PerThread::current()));
let mut item = slot.lock();
if item.thread.id() != thread::current().id() {
*item = PerThread::current();
}
item.meta = scope::try_meta();
item.epoch = item.epoch.wrapping_add(1);
}
pub(crate) fn exit(&self) {
let slot = ward!(self.0.tls.get());
let mut item = slot.lock();
item.meta = None;
}
pub fn check(&self) -> impl Iterator<Item = StuckActorInfo> {
let mut last_check = self.0.last_check.lock();
let mut check = FxHashMap::with_capacity_and_hasher(last_check.len(), Default::default());
let mut stuck = Vec::new();
for slot in &self.0.tls {
let current = slot.lock();
let current_meta = ward!(¤t.meta, continue);
let thread_id = current.thread.id();
check.insert(thread_id, (current_meta.clone(), current.epoch));
let (prev_meta, prev_epoch) = ward!(last_check.get(&thread_id), continue);
if current.epoch == *prev_epoch && current_meta == prev_meta {
stuck.push(StuckActorInfo::new(
current_meta.clone(),
current.thread.clone(),
));
}
}
*last_check = check;
stuck.into_iter()
}
}