1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use super::{
cut::{self, Subscription},
proto::{membership_client::MembershipClient, Ack, Edge, Endpoint},
Cluster,
};
use futures::{
future::{join, FutureExt},
stream::{FuturesUnordered, StreamExt},
};
use std::{collections::HashMap, sync::Arc};
use tokio::{
select,
time::{delay_for, timeout},
};
impl Cluster {
pub(crate) async fn detect_faults(self: Arc<Self>, mut cuts: Subscription) -> cut::Result {
loop {
select! {
_ = self.spin_fd_probes() => {}
cut = cuts.recv() => { cut?; }
}
}
}
async fn spin_fd_probes(self: &Arc<Self>) {
let (conf_id, mut subjects) = async {
#[derive(Default)]
struct Subject {
rings: Vec<u64>,
faults: usize,
}
let mut subjects: HashMap<_, Subject> = HashMap::with_capacity(self.cfg.k);
let state = self.state.read().await;
for (ring, subject) in (state.nodes.successors(&self.local_node()))
.cloned()
.enumerate()
{
subjects.entry(subject).or_default().rings.push(ring as u64);
}
(state.conf_id, subjects)
}
.await;
loop {
let probes = (subjects.iter_mut())
.map(|(e, s)| self.probe(e, &mut s.faults))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {});
let mut state = join(delay_for(self.cfg.fd_timeout), probes)
.then(|_| self.state.write())
.await;
if state.conf_id != conf_id {
break;
}
let faulted = (subjects.iter_mut())
.filter(|(_, s)| s.faults >= self.cfg.fd_strikes)
.flat_map(|(e, s)| {
s.faults = 0;
s.rings.iter().map(move |ring| Edge::down(e.clone(), *ring))
});
self.enqueue_edges(&mut *state, faulted);
}
}
async fn probe(&self, subject: &Endpoint, faults: &mut usize) {
let send_probe = timeout(self.cfg.fd_timeout, async {
let e = self.resolve_endpoint(subject).ok()?;
let mut c = MembershipClient::connect(e).await.ok()?;
c.probe(Ack {}).await.ok()
});
match send_probe.await.ok().flatten() {
Some(_) => *faults = 0,
None => *faults += 1,
}
}
}