1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use super::{Worker, STATUS_QUIET, STATUS_RUNNING, STATUS_TERMINATING};
use crate::{proto::HeartbeatStatus, Error};
use std::{
error::Error as StdError,
sync::{atomic, Arc},
time::{self, Duration},
};
const CHECK_STATE_INTERVAL: Duration = Duration::from_millis(100);
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
impl<E> Worker<E>
where
E: StdError,
{
/// Send beats to Fakotry and quiet/terminate workers if signalled so.
///
/// Some core details:
/// - beats should be sent to Faktory at least every 15 seconds;
/// - a worker's lifecycle is "running -> quiet -> terminate";
/// - STATUS_QUIET means the worker should not consume any new jobs,
/// but should _continue_ processing its current job (if any);
///
/// See more details [here](https://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/workers.go#L13-L49).
///
/// Note that this method is not cancellation safe. We are using an interval timer internally, that
/// would be reset should we call this method anew. Besides, the `Heartbeat` command is being issued
/// with the help of `AsyncWriteExt::write_all` which is not cancellation safe either.
pub(crate) async fn listen_for_heartbeats(
&mut self,
statuses: &[Arc<atomic::AtomicUsize>],
) -> Result<bool, Error> {
let mut target = STATUS_RUNNING;
let mut last = time::Instant::now();
let mut check_state_interval = tokio::time::interval(CHECK_STATE_INTERVAL);
check_state_interval.tick().await;
loop {
check_state_interval.tick().await;
// has a worker failed?
let worker_failure = target == STATUS_RUNNING
&& statuses
.iter()
.any(|s| s.load(atomic::Ordering::SeqCst) == STATUS_TERMINATING);
if worker_failure {
// tell all workers to exit
// (though chances are they've all failed already)
for s in statuses {
s.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
}
break Ok(false);
}
if last.elapsed() < HEARTBEAT_INTERVAL {
// don't send a heartbeat yet
continue;
}
match self.c.heartbeat().await {
Ok(hb) => {
match hb {
HeartbeatStatus::Ok => {
tracing::trace!("Faktory server HEARTBEAT status is OK.");
}
HeartbeatStatus::Quiet => {
tracing::trace!("Faktory server HEARTBEAT status is QUIET.");
// tell the workers to eventually terminate
for s in statuses {
s.store(STATUS_QUIET, atomic::Ordering::SeqCst);
}
target = STATUS_QUIET;
}
HeartbeatStatus::Terminate => {
tracing::trace!("Faktory server HEARTBEAT status is TERMINATE.");
// tell the workers to terminate
// *and* fail the current job and immediately return
for s in statuses {
s.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
}
break Ok(true);
}
}
}
Err(e) => {
// for this to fail, the workers have probably also failed
for s in statuses {
s.store(STATUS_TERMINATING, atomic::Ordering::SeqCst);
}
break Err(e);
}
}
last = time::Instant::now();
}
}
}