1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use super::{Shared, TaskQueues};
use crate::bee::{Queen, Worker};
use std::io::Error as SpawnError;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
/// Sentinel for a worker thread. Until the sentinel is cancelled, it will respawn the worker
/// thread if it panics.
pub struct Sentinel<W, Q, T, F>
where
W: Worker,
Q: Queen<Kind = W>,
T: TaskQueues<W>,
F: Fn(usize, &Arc<Shared<Q, T>>) -> Result<JoinHandle<()>, SpawnError> + 'static,
{
/// The index of the worker thread
thread_index: usize,
/// The shared data to pass to the new worker thread when respawning
shared: Arc<Shared<Q, T>>,
/// Whether sentinel is active
active: bool,
/// The function that will be called to respawn the worker thread
respawn_fn: F,
}
impl<W, Q, T, F> Sentinel<W, Q, T, F>
where
W: Worker,
Q: Queen<Kind = W>,
T: TaskQueues<W>,
F: Fn(usize, &Arc<Shared<Q, T>>) -> Result<JoinHandle<()>, SpawnError> + 'static,
{
pub fn new(thread_index: usize, shared: Arc<Shared<Q, T>>, respawn_fn: F) -> Self {
Self {
thread_index,
shared,
active: true,
respawn_fn,
}
}
/// Cancel and destroy this sentinel.
pub fn cancel(mut self) {
self.active = false;
}
}
impl<W, Q, T, F> Drop for Sentinel<W, Q, T, F>
where
W: Worker,
Q: Queen<Kind = W>,
T: TaskQueues<W>,
F: Fn(usize, &Arc<Shared<Q, T>>) -> Result<JoinHandle<()>, SpawnError> + 'static,
{
fn drop(&mut self) {
if self.active {
// if the sentinel is active, that means the thread panicked during task execution, so
// we have to finish the task here before respawning
self.shared.finish_task(thread::panicking());
// only respawn if the sentinel is active and the hive has not been poisoned
if !self.shared.is_poisoned() {
// can't do anything with the previous JoinHandle
let _ = self
.shared
.respawn_thread(self.thread_index, |thread_index| {
(self.respawn_fn)(thread_index, &self.shared)
});
}
}
}
}