use crate::monitor::event::{Event, EventType};
use crate::monitor::MonitorConfig;
use crate::thread::pool::{Message, Thread};
use std::panic;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{panicking, spawn, JoinHandle};
pub struct PanicMarker(pub usize, pub Sender<usize>);
pub struct RecoveryThread(pub Option<JoinHandle<()>>);
impl RecoveryThread {
pub fn new(
rx: Receiver<usize>,
tx: Sender<usize>,
task_rx: Arc<Mutex<Receiver<Message>>>,
threads: Arc<Mutex<Vec<Thread>>>,
monitor: Option<MonitorConfig>,
) -> Self {
if let Some(monitor) = monitor.clone() {
if monitor.mask() & EventType::ThreadPoolPanic as u32 != 0 {
let sync_monitor = Mutex::new(monitor);
let sync_monitor: &'static Mutex<MonitorConfig> = Box::leak(Box::new(sync_monitor));
panic::set_hook(Box::new(move |info| {
if let Ok(monitor) = sync_monitor.lock() {
if let Some(info) = info.location() {
monitor.send(Event::new(EventType::ThreadPoolPanic).with_info(
format!(
"Thread {} panicked at {}:{}:{}",
std::thread::current().name().unwrap_or("<unknown>"),
info.file(),
info.line(),
info.column()
),
));
} else {
monitor.send(Event::new(EventType::ThreadPoolPanic).with_info(
format!(
"Thread {} panicked, no location available",
std::thread::current().name().unwrap_or("<unknown>"),
),
));
}
}
}))
};
}
let thread = spawn(move || loop {
for panicking_thread in &rx {
let mut threads = threads.lock().unwrap();
if let Some(thread) = threads[panicking_thread].os_thread.take() {
thread.join().ok();
}
let restarted_thread = Thread::new(
panicking_thread,
task_rx.clone(),
tx.clone(),
monitor.clone(),
);
threads[panicking_thread] = restarted_thread;
if let Some(monitor) = &monitor {
monitor.send(
Event::new(EventType::ThreadRestarted)
.with_info(format!("Thread {} was restarted", panicking_thread)),
);
}
}
});
Self(Some(thread))
}
}
impl Drop for PanicMarker {
fn drop(&mut self) {
if panicking() {
self.1.send(self.0).ok();
}
}
}