elfo_core/
stuck_detection.rs

1use std::{
2    sync::Arc,
3    thread::{self, Thread, ThreadId},
4};
5
6use derive_more::Constructor;
7use fxhash::FxHashMap;
8use parking_lot::Mutex;
9use thread_local::ThreadLocal;
10
11use elfo_utils::ward;
12
13use crate::{actor::ActorMeta, scope};
14
15/// Detects stuck actors.
16/// Usually, `on_thread_park` is good place to call [`StuckDetector::check()`].
17#[derive(Default, Clone)]
18pub struct StuckDetector(Arc<Inner>);
19
20/// Information about a stuck actor, returned by [`StuckDetector::check()`].
21#[derive(Constructor)]
22pub struct StuckActorInfo {
23    meta: Arc<ActorMeta>,
24    thread: Thread,
25}
26
27impl StuckActorInfo {
28    /// Returns the actor's meta.
29    pub fn meta(&self) -> &Arc<ActorMeta> {
30        &self.meta
31    }
32
33    /// Returns a thread where the actor runs.
34    pub fn thread(&self) -> &Thread {
35        &self.thread
36    }
37}
38
39type Epoch = u32;
40
41#[derive(Default)]
42struct Inner {
43    tls: ThreadLocal<Mutex<PerThread>>,
44    last_check: Mutex<FxHashMap<ThreadId, (Arc<ActorMeta>, Epoch)>>,
45}
46
47struct PerThread {
48    thread: Thread,
49    meta: Option<Arc<ActorMeta>>,
50    epoch: Epoch,
51}
52
53impl PerThread {
54    fn current() -> Self {
55        Self {
56            thread: thread::current(),
57            meta: None,
58            epoch: 0,
59        }
60    }
61}
62
63impl StuckDetector {
64    pub(crate) fn enter(&self) {
65        let slot = self.0.tls.get_or(|| Mutex::new(PerThread::current()));
66        let mut item = slot.lock();
67
68        // `ThreadLocal` can reuse the slot, so we need to reset it explicitly.
69        if item.thread.id() != thread::current().id() {
70            *item = PerThread::current();
71        }
72
73        item.meta = scope::try_meta();
74        item.epoch = item.epoch.wrapping_add(1);
75    }
76
77    pub(crate) fn exit(&self) {
78        let slot = ward!(self.0.tls.get());
79        let mut item = slot.lock();
80        item.meta = None;
81    }
82
83    /// Returns actors that run at the same thread since the last call.
84    pub fn check(&self) -> impl Iterator<Item = StuckActorInfo> {
85        let mut last_check = self.0.last_check.lock();
86        let mut check = FxHashMap::with_capacity_and_hasher(last_check.len(), Default::default());
87        let mut stuck = Vec::new();
88
89        for slot in &self.0.tls {
90            let current = slot.lock();
91            let current_meta = ward!(&current.meta, continue);
92            let thread_id = current.thread.id();
93
94            check.insert(thread_id, (current_meta.clone(), current.epoch));
95
96            let (prev_meta, prev_epoch) = ward!(last_check.get(&thread_id), continue);
97            if current.epoch == *prev_epoch && current_meta == prev_meta {
98                stuck.push(StuckActorInfo::new(
99                    current_meta.clone(),
100                    current.thread.clone(),
101                ));
102            }
103        }
104
105        *last_check = check;
106
107        stuck.into_iter()
108    }
109}