1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
use {
super::protocol::RequestVote,
crate::{
PeerId,
groups::{
StateMachine,
Storage,
Term,
raft::{
Message,
leader::Leader,
protocol::Vote,
role::{Role, RoleHandlerError},
shared::Shared,
},
},
primitives::{Short, ShortFmtExt},
},
core::{
iter::once,
marker::PhantomData,
ops::ControlFlow,
pin::Pin,
task::{Context, Poll},
},
std::collections::HashSet,
tokio::time::{Sleep, sleep},
};
/// Internal state for the candidate role that is currently running elections
/// for its leadership candidacy.
#[derive(Debug)]
pub struct Candidate<M: StateMachine> {
/// The current term for this node.
term: Term,
/// The set of peers from which votes have been requested.
requested_from: HashSet<PeerId>,
/// The set of peers that have granted their vote.
granted: HashSet<PeerId>,
/// The set of peers that have abstained from voting.
abstained: HashSet<PeerId>,
/// The set of peers that have explicitly denied their vote.
denied: HashSet<PeerId>,
/// The election timeout for the current election round. If this timeout
/// elapses without reaching a quorum, a new election round will be started.
election_timeout: Pin<Box<Sleep>>,
/// Wakers for tasks that are waiting for the election result (e.g., to
/// become leader or step down to follower). These tasks will be woken up
/// when a quorum is reached or when stepping down to follower state.
wakers: Vec<std::task::Waker>,
#[doc(hidden)]
_phantom: PhantomData<M>,
}
impl<M: StateMachine> Candidate<M> {
/// Creates a new candidate role for the specified term and starts the
/// election process by sending `RequestVote` messages to all bonded peers in
/// the group.
pub fn new<S: Storage<M::Command>>(
term: Term,
shared: &mut Shared<S, M>,
) -> Self {
assert_ne!(
term,
Term::zero(),
"Candidate role should be at least in term 1"
);
// nodes in candidate state are always considered offline
shared.set_offline();
// make sure that we can vote in this term by checking if we have not
// already voted for another candidate in this term.
let mut term = term;
loop {
if shared.can_vote(term, shared.local_id()) {
break;
}
term = term.next();
}
let election_timeout = shared.consensus().election_timeout();
let election_timeout = Box::pin(sleep(election_timeout));
let candidate = shared.local_id();
let log_position = shared.storage.last();
let request = RequestVote {
term,
candidate,
log_position,
};
let labels = [
("network", shared.network_id().short().to_string()),
("group", shared.group_id().short().to_string()),
];
metrics::counter!("mosaik.groups.raft.elections", &labels).increment(1);
tracing::debug!(
term = %term,
log = %log_position,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
"starting new leader elections",
);
// Broadcast the `RequestVote` message to all bonded peers in the group.
let requested_from = shared
.bonds()
.broadcast_raft(Message::RequestVote(request))
.expect("RequestVote serialization should not fail");
let requested_from =
requested_from.into_iter().chain(once(candidate)).collect();
// We start with granting our own vote to ourselves.
let votes_granted = once(candidate).collect();
shared.save_vote(term, candidate);
Self {
term,
requested_from,
granted: votes_granted,
election_timeout,
wakers: Vec::new(),
abstained: HashSet::new(),
denied: HashSet::new(),
_phantom: PhantomData,
}
}
}
impl<M: StateMachine> Candidate<M> {
/// As a candidate, we start elections and wait for votes from other nodes or
/// `AppendEntries` from a leader. If no quorum is reached within the election
/// timeout, we start a new election.
///
/// Returns `Poll::Ready(ControlFlow::Break(new_role))` if the role should
/// transition to a new state (e.g., follower or leader) or `Poll::Pending`
/// if it should continue waiting in the candidate state.
pub fn poll<S: Storage<M::Command>>(
&mut self,
cx: &mut Context,
shared: &mut Shared<S, M>,
) -> Poll<ControlFlow<Role<M>>> {
if self.quorum_reached() {
tracing::debug!(
term = %self.term(),
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
votes = self.format_vote_counts(),
"quorum reached, becoming leader",
);
return Poll::Ready(ControlFlow::Break(
Leader::new(self.term, self.granted.clone(), shared).into(),
));
}
if self.election_timeout.as_mut().poll(cx).is_ready() {
// election timeout elapsed without reaching a quorum, so we start a new
// election by incrementing the term and broadcasting new RequestVote
// message to all bonded peers in the group.
tracing::debug!(
term = %self.term(),
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
committee_size = self.requested_from.len(),
votes = self.format_vote_counts(),
"quorum not reached",
);
return Poll::Ready(ControlFlow::Break(
Self::new(self.term.next(), shared).into(),
));
}
// register the waker for this task so that it can be woken up when we
// receive enough votes to become leader.
self.wakers.push(cx.waker().clone());
Poll::Pending
}
/// The current term of this elections round.
pub const fn term(&self) -> Term {
self.term
}
/// When in a candidate state, we only expect to receive `RequestVoteResponse`
/// messages from other nodes.
///
/// Returns Ok(()) if the message was handled by the candidate, or
/// Err(message) if the message was unexpected and was not handled by this
/// role.
pub fn receive_protocol_message<S: Storage<M::Command>>(
&mut self,
message: Message<M>,
sender: PeerId,
shared: &Shared<S, M>,
) -> Result<(), RoleHandlerError<M>> {
let response = match message {
// Raft paper 5.2: A candidate continues in the candidate state until it
// either wins the election and becomes leader, or discovers that another
// server has become leader or has a higher term, in which case it
// immediately returns to follower state.
Message::RequestVoteResponse(response) => response,
// Raft paper 5.2: While waiting for votes, a candidate may receive an
// AppendEntries RPC from another server claiming to be leader. If the
// leader's term (included in its RPC) is at least as large as the
// candidate's current term, then the candidate recognizes the leader as
// legitimate and returns to follower state.
Message::AppendEntries(request) => {
return Err(RoleHandlerError::<M>::StepDown(request));
}
other => return Err(RoleHandlerError::<M>::Unexpected(other)),
};
if !self.requested_from.contains(&sender) {
tracing::warn!(
from = %Short(sender),
term = %response.term,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
"ignoring unsolicited vote",
);
return Ok(());
}
match response.vote {
Vote::Granted => {
// If the peer granted its vote, we add it to the set of votes granted
// to us. We will check if we have reached a quorum after processing the
// vote and wake up any waiting
self.granted.insert(sender);
tracing::debug!(
peer = %Short(sender),
term = %response.term,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
votes = self.format_vote_counts(),
"vote granted by",
);
}
Vote::Abstained => {
// If the peer abstained from voting, we don't count it towards our
// granted votes, but we also don't count it against us. We will still
// need to reach a quorum of granted votes from the remaining peers to
// win the election, but this peer won't be part of the quorum
// denominator until it catches up with the log and can vote again.
self.abstained.insert(sender);
tracing::debug!(
peer = %Short(sender),
term = %response.term,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
votes = self.format_vote_counts(),
"vote abstained by",
);
}
Vote::Denied => {
// peer explicitly denied its vote, we add it to the set of denied
// votes. This will count against us when calculating if we have reached
// a quorum, but we will still wait for the
self.denied.insert(sender);
tracing::debug!(
peer = %Short(sender),
term = %response.term,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
votes = self.format_vote_counts(),
"vote denied by",
);
}
}
if self.quorum_reached() || self.elections_lost() {
// we know the outcome of the election, so we can wake up any waiting
// tasks to transition to a new role.
for waker in self.wakers.drain(..) {
waker.wake();
}
}
Ok(())
}
/// Checks if we have received votes from a quorum of peers to win the
/// leader election.
fn quorum_reached(&self) -> bool {
self.granted.len() >= self.quorum()
}
/// Checks if we have received enough denied votes to lose the leader
/// election.
fn elections_lost(&self) -> bool {
self.denied.len() >= self.quorum()
}
fn format_vote_counts(&self) -> String {
// we are always a voter in our own elections, so we will never divide by
// zero here,
let percent = (self.granted.len() * 100)
.checked_div(self.quorum())
.expect("we always vote for ourselves as a candidate");
format!(
"[{}+/{}-/{}?/n={},q={}/{:.1}%]",
self.granted.len(),
self.denied.len(),
self.abstained.len(),
self.requested_from.len(),
self.quorum(),
percent
)
}
/// The number of votes required to reach a quorum based on the number of
/// votes requested and the number of abstained votes.
fn quorum(&self) -> usize {
let voters_count = self
.requested_from
.len()
.saturating_sub(self.abstained.len());
(voters_count / 2) + 1
}
}