1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
use {
crate::{
PeerId,
groups::{
Index,
IndexRange,
StateMachine,
Storage,
SyncContext,
Term,
raft::{
Message,
protocol::{AppendEntries, Forward, LogEntry, Vote},
role::{Role, RoleHandlerError},
shared::Shared,
},
},
primitives::{
Encoded,
FmtIter,
Pretty,
Short,
ShortFmtExt,
UnboundedChannel,
},
},
core::{
cmp::Reverse,
iter::once,
ops::ControlFlow,
pin::Pin,
task::{Context, Poll},
time::Duration,
},
std::{
collections::{HashMap, HashSet},
time::Instant,
},
tokio::time::{Sleep, sleep},
};
/// In the leader role, the node is active and handles log-mutating requests
/// from clients, replicates log entries, and sends periodic heartbeats to
/// followers if no log entries are being replicated within the configured
/// heartbeat interval. If the leader receives an `AppendEntries` message from
/// another leader with a higher term, it steps down to follower state and
/// follows that leader.
#[derive(Debug)]
pub struct Leader<M: StateMachine> {
/// The current term for this node.
term: Term,
/// The interval at which the leader sends heartbeats to followers if no log
/// entries are being replicated.
heartbeat_interval: Duration,
/// Fires at the configured interval to trigger sending empty `AppendEntries`
/// heartbeats to followers if no log entries are being replicated within
/// the heartbeat interval.
heartbeat_timeout: Pin<Box<Sleep>>,
/// Pending client commands that have not yet been replicated to followers.
/// These commands will be included in the next `AppendEntries` message sent
/// to followers.
client_commands: UnboundedChannel<M::Command>,
/// The current voting committee that is used to determine the quorum for
/// elections and log replication.
committee: Committee,
/// Wakers for tasks that are waiting for the leader to either send
/// heartbeats or replicate log entries.
wakers: Vec<std::task::Waker>,
}
impl<M: StateMachine> Leader<M> {
/// Transitions into a new leader role for a new term.
///
/// This will also inform the new leader about the peers that have granted
/// their vote in the election, which optimizes the formation of the initial
/// quorum.
pub fn new(
term: Term,
voted_by: HashSet<PeerId>,
shared: &Shared<impl Storage<M::Command>, M>,
) -> Self {
// initialize the voting committee with the peers that granted their vote to
// us in the election.
let committee = Committee::new(voted_by, shared);
let heartbeat_interval = shared.config().consensus().heartbeat_interval;
let heartbeat_timeout = Box::pin(sleep(heartbeat_interval));
// Notify the group that we are the new leader. This will cause followers to
// update their leader information and start following us.
shared.update_leader(Some(shared.local_id()));
shared.set_online();
Self {
term,
heartbeat_timeout,
heartbeat_interval,
committee,
wakers: Vec::new(),
client_commands: UnboundedChannel::default(),
}
}
/// Returns the current term.
pub const fn term(&self) -> Term {
self.term
}
}
impl<M: StateMachine> Leader<M> {
/// As a leader, we send `AppendEntries` with new log entries or as heartbeats
/// to all followers. We also handle client requests for log mutations.
///
/// If we receive an `AppendEntries` from another leader with a higher term,
/// we step down to follower state and follow that leader.
///
/// Returns `Poll::Ready(ControlFlow::Break(new_role))` if the role should
/// transition to a new state (e.g., follower) or `Poll::Pending` if it
/// should continue waiting in the leader state.
pub fn poll<S: Storage<M::Command>>(
&mut self,
cx: &mut Context,
shared: &mut Shared<S, M>,
) -> Poll<ControlFlow<Role<M>>> {
if self.poll_pending_entries(cx, shared).is_ready() {
// this tick was spent publishing pending client commands to followers
return Poll::Ready(ControlFlow::Continue(()));
}
if self.poll_next_heartbeat(cx, shared).is_ready() {
// this tick was spent sending a heartbeat to followers
// since no commands were published, and the heartbeat timeout elapsed.
return Poll::Ready(ControlFlow::Continue(()));
}
// store a waker to this task so we can wait it up when new client commands
// are added or when the heartbeat timeout elapses to trigger the next tick.
self.wakers.push(cx.waker().clone());
Poll::Pending
}
/// As a leader we are only interested in receiving `AppendEntriesResponse`
/// messages from followers to track their replication progress and update our
/// commit index.
///
/// Returns Ok(()) if the message was handled by the leader, or Err(message)
/// if the message was unexpected and was not handled by this role.
pub fn receive_protocol_message<S: Storage<M::Command>>(
&mut self,
message: Message<M>,
sender: PeerId,
shared: &mut Shared<S, M>,
) -> Result<(), RoleHandlerError<M>> {
match message {
// sent by followers in response to our `AppendEntries` messages.
Message::AppendEntriesResponse(response) => {
match response.vote {
// The follower is up to date with our log and can be part of the
// current voting committee. Record its vote and its log progress
// for commit index tracking.
Vote::Granted => {
self.record_vote(sender, response.last_log_index, shared);
}
// The follower is not up to date with our log or has some other issue
// that prevents it from being part of the current voting committee.
// Remove it from the committee so it is not considered for quorum
// calculations until it catches up with our log and can grant its
// vote again.
Vote::Abstained => self.record_no_vote(sender, true, shared),
// the follower explicitly denied our `AppendEntries`
Vote::Denied => self.record_no_vote(sender, false, shared),
}
}
// if we're a leader and we're receiving a message with the same term from
// another leader, this means that the group has two rival leaders, this
// indicates a network partition. Trigger new elections with a higher
// term.
Message::AppendEntries(request) if request.term == self.term() => {
return Err(RoleHandlerError::RivalLeader(request));
}
// sent by followers that are forwarding client commands to the leader.
Message::Forward(Forward::Command {
commands,
request_id,
}) => {
if !commands.is_empty() {
let assigned =
self.enqueue_commands(commands.into_iter().map(|e| e.0), shared);
if let Some(request_id) = request_id {
// the follower is interested in knowing the log index assigned to
// this command asap.
shared
.bonds()
.send_raft_to(
Message::Forward(Forward::CommandAck {
request_id,
assigned,
}),
sender,
)
.expect("infallible serialization");
}
}
}
// sent by followers that are forwarding client queries to the leader.
// todo: run queries in parallel.
Message::Forward(Forward::Query { query, request_id }) => {
let result = shared.state_machine().query(query.0);
let position = shared.committed().index();
if let Err(e) = shared.bonds().send_raft_to(
Message::Forward(Forward::QueryResponse {
request_id,
result: Encoded(result),
position,
}),
sender,
) {
tracing::warn!(
err = ?e,
request_id = %request_id,
position = %position,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
"failed to serialize query response",
);
}
}
// all other message types are unexpected in the leader state. ignore.
message => {
return Err(RoleHandlerError::<M>::Unexpected(message));
}
}
Ok(())
}
/// Adds a new client command to the list of pending commands that will be
/// included in the next `AppendEntries` message sent to followers. This
/// method is called when the leader receives a client request.
///
/// This will wake up any pending wakers that are waiting on the next leader
/// tick.
///
/// Returns the index of the log entry that will be assigned to the last
/// command in the batch once it is appended to the log. This allows the
/// caller to track the progress of their command and know when it has been
/// committed and applied to the state machine.
///
/// If the batch of commands is empty it returns [0,0].
pub fn enqueue_commands<S: Storage<M::Command>>(
&mut self,
commands: impl IntoIterator<Item = M::Command>,
shared: &Shared<S, M>,
) -> IndexRange {
let last_index = shared.storage.last().index();
let pending_commands_count = self.client_commands.len();
let last_index = last_index + pending_commands_count;
for command in commands {
self.client_commands.send(command);
}
let new_commands_count = self
.client_commands
.len()
.saturating_sub(pending_commands_count);
if new_commands_count != 0 {
let new_position = last_index + new_commands_count;
let assigned_range = last_index.next()..=new_position;
for waker in self.wakers.drain(..) {
waker.wake();
}
return assigned_range;
}
IndexRange::new(Index::zero(), Index::zero())
}
/// Records a positive vote from a follower that has acknowledged our
/// `AppendEntries` message and is up to date with our log.
fn record_vote<S: Storage<M::Command>>(
&mut self,
follower: PeerId,
log_index: Index,
shared: &mut Shared<S, M>,
) {
// purge voters that are down and try to backfill voters from online
// non-voters.
self.committee.remove_dead_voters(shared);
let prev_committed = shared.committed();
let quorum_index = self.committee.record_vote(follower, log_index);
self
.committee
.emit_metrics(shared.network_id(), shared.group_id());
if prev_committed != quorum_index {
let new_committed = shared.commit_up_to(quorum_index);
if new_committed != prev_committed {
// advance the commit index up to the latest index that has reached a
// quorum of voters in the committee.
tracing::trace!(
committed_ix = %new_committed,
log_position = %shared.storage.last(),
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
);
shared.update_committed(new_committed);
// check with the state sync provider if we can prune any log entries up
// to the new committed index after advancing the commit index.
shared.prune_safe_prefix();
// Trigger an immediate heartbeat so followers learn about the
// new commit index without waiting for the next heartbeat
// interval. This is important for state sync: when a
// TakeSnapshot command is committed, all in-sync followers
// should process it promptly so they can start serving
// snapshot data to catching-up peers in parallel with the
// leader.
self.schedule_immediate_heartbeat();
}
}
}
/// Records a negative vote from a follower that has not acknowledged our
/// `AppendEntries` message or is not up to date with our log.
fn record_no_vote<S: Storage<M::Command>>(
&mut self,
follower: PeerId,
abstained: bool,
shared: &mut Shared<S, M>,
) {
if abstained {
// the follower didn't explicitly deny our `AppendEntries`, but it also
// didn't grant its vote, which means it is not up to date with our log
// and can't be part of the current voting committee until it catches up.
// Remove it from the committee so it is not considered for quorum
// calculations until it can grant its vote again.
self.committee.remove(follower);
}
// purge voters that are down and try to backfill voters from online
// non-voters.
self.committee.remove_dead_voters(shared);
self
.committee
.emit_metrics(shared.network_id(), shared.group_id());
// see if after purging dead voters and removing this non-voting follower,
// we have reached a quorum with a smaller committee and can advance the
// commit index.
let prev_committed = shared.committed();
let quorum_index = self.committee.highest_quorum_index();
if prev_committed != quorum_index {
let new_committed = shared.commit_up_to(quorum_index);
if new_committed != prev_committed {
// advance the commit index up to the latest index that has reached a
// quorum of voters in the committee.
tracing::trace!(
committed = %new_committed,
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
);
shared.update_committed(new_committed);
// check with the state sync provider if we can prune any log entries up
// to the new committed index after advancing the commit index.
shared.prune_safe_prefix();
// Trigger an immediate heartbeat so followers learn about the
// new commit index promptly.
self.schedule_immediate_heartbeat();
}
}
}
}
// internal impl
impl<M: StateMachine> Leader<M> {
/// Checks if there are any pending client commands and publishes them to all
/// followers as `AppendEntries` message.
fn poll_pending_entries<S: Storage<M::Command>>(
&mut self,
cx: &mut Context,
shared: &mut Shared<S, M>,
) -> Poll<()> {
if self.client_commands.is_empty() {
// no pending client commands to publish, just wait for the next heartbeat
// timeout or new client commands to arrive to publish `AppendEntries` to
// followers.
return Poll::Pending;
}
let count = self.client_commands.len();
let mut entries = Vec::with_capacity(count);
if self
.client_commands
.poll_recv_many(cx, &mut entries, count)
.is_pending()
{
return Poll::Pending;
}
let prev_pos = shared.storage.last();
// append the new client commands to our log before broadcasting them to
// followers but without committing them yet.
for command in &entries {
shared.storage.append(command.clone(), self.term);
}
// signal log update to public api observers
shared.update_log_pos(shared.storage.last());
let message = Message::AppendEntries(AppendEntries {
term: self.term,
leader_commit: shared.committed().index(),
leader: shared.local_id(),
prev_log_position: prev_pos,
entries: entries
.into_iter()
.map(|c| LogEntry {
command: Encoded(c),
term: self.term,
})
.collect(),
});
// broadcast the new log entries to all followers.
let Ok(followers) =
shared.bonds().broadcast_raft(message).inspect_err(|e| {
tracing::warn!(
error = %e,
range = %Pretty(&(prev_pos.index().next()..=prev_pos.index() + count)),
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
"unable to serialize log entries for replication; reverting and dropping batch"
);
})
else {
// failed to serialize commands, revert the log to the previous position
// before the failed batch append and drop the batch of commands.
shared.storage.truncate(prev_pos.index().next());
return Poll::Ready(());
};
// always vote for our own `AppendEntries` messages. If we are the
// only voter in the committee, this will immediately commit the new log
// entries.
self.record_vote(shared.local_id(), prev_pos.index() + count, shared);
if !followers.is_empty() {
let range = prev_pos.index().next()..=prev_pos.index() + count;
tracing::trace!(
followers = %FmtIter::<Short<_>, _>::new(followers),
ix_range = %Pretty(&range),
group = %Short(shared.group_id()),
network = %Short(shared.network_id()),
"broadcasting {count} new log entries to",
);
}
self.reset_heartbeat_timeout();
Poll::Ready(())
}
/// Checks if the heartbeat timeout has elapsed without any new client command
/// being published.
fn poll_next_heartbeat<S: Storage<M::Command>>(
&mut self,
cx: &mut Context,
shared: &Shared<S, M>,
) -> Poll<()> {
if self.heartbeat_timeout.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}
let prev_pos = shared.storage.last();
let heartbeat = AppendEntries::<M::Command> {
term: self.term,
leader: shared.local_id(),
prev_log_position: prev_pos,
entries: Vec::new(),
leader_commit: shared.committed().index(),
};
shared
.bonds()
.broadcast_raft(Message::AppendEntries(heartbeat))
.expect("infallible serialization");
self.reset_heartbeat_timeout();
Poll::Ready(())
}
/// Called every time we send `AppendEntries` to followers.
fn reset_heartbeat_timeout(&mut self) {
let next_heartbeat = Instant::now() + self.heartbeat_interval;
self.heartbeat_timeout.as_mut().reset(next_heartbeat.into());
for waker in self.wakers.drain(..) {
waker.wake();
}
}
/// Resets the heartbeat timer to fire immediately and wakes the poll loop
/// so that the next leader tick sends an empty `AppendEntries` with the
/// latest `leader_commit` to all followers. This ensures that followers
/// learn about newly committed entries without waiting for the regular
/// heartbeat interval.
fn schedule_immediate_heartbeat(&mut self) {
if self.client_commands.is_empty() {
self.heartbeat_timeout.as_mut().reset(Instant::now().into());
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
}
#[derive(Debug)]
struct Committee {
/// Map of current voters in the committee to their last acknowledged log
/// index, which is used to track their replication progress and determine
/// the commit index based on the highest log index that has been replicated
/// to a quorum of voters.
voters: HashMap<PeerId, Index>,
/// Map of non-voting followers that have acknowledged our `AppendEntries`
/// and are caught up with our log, and can be promoted to voters if needed.
non_voters: HashMap<PeerId, Index>,
/// The maximum number of voters that can be part of the voting committee at
/// any given time. This is used to limit the size of the voting committee
/// and ensure that we can achieve quorum with a reasonable number of
/// voters.
max_committee_size: usize,
}
impl Committee {
/// Initializes a new voting committee at the beginning of a new leader term
/// based on the peers that granted their vote to us in the election.
///
/// the list of voters always includes the leader itself.
pub fn new<S: Storage<M::Command>, M: StateMachine>(
voters: HashSet<PeerId>,
shared: &Shared<S, M>,
) -> Self {
let last_log_index = shared.storage.last().index();
let voters = voters
.into_iter()
.map(|voter| (voter, last_log_index))
.collect();
Self {
voters,
non_voters: HashMap::new(),
max_committee_size: 5, // todo: move to config
}
}
/// Records a vote from a follower that has acknowledged our `AppendEntries`
/// message and is up to date with our log.
///
/// Followers that grant positive votes can be considered as part of the
/// current voting committee.
///
/// Returns the index of the latest log entry that has been replicated to a
/// quorum of voters in the current voting committee after recording this
/// vote, which can be used by the leader to advance the commit index.
pub fn record_vote(&mut self, follower: PeerId, log_index: Index) -> Index {
// if the follower is already a voter, just update its last acknowledged log
// index.
if let Some(voter) = self.voters.get_mut(&follower) {
*voter = log_index;
} else if self.non_voters.insert(follower, log_index).is_none() {
// new non-voting follower that acknowledged our `AppendEntries`, try to
// promote it to a voter if we have room in the committee based on the
// target committee size.
self.try_backfill_voters();
}
// after recording this vote, check if we have reached a quorum with the
// current committee and can advance the commit index.
self.highest_quorum_index()
}
/// Removes voters from the committee that don't have active bonds with the
/// leader. This is a best-effort cleanup mechanism to prevent the committee
/// from being filled with voters that are not actively replicating log
/// entries and are effectively offline, which would reduce the leader's
/// ability to commit new log entries and make progress.
pub fn remove_dead_voters<S: Storage<M::Command>, M: StateMachine>(
&mut self,
shared: &Shared<S, M>,
) {
let active_bonds = shared
.bonds()
.iter()
.map(|bond| *bond.peer().id())
.chain(once(shared.local_id()))
.collect::<HashSet<_>>();
self.voters.retain(|voter, _| active_bonds.contains(voter));
self
.non_voters
.retain(|non_voter, _| active_bonds.contains(non_voter));
self.try_backfill_voters();
}
/// Removes a follower from the voting committee that has casted a negative
/// vote for our `AppendEntries` message.
pub fn remove(&mut self, follower: PeerId) {
self.voters.remove(&follower);
self.non_voters.remove(&follower);
self.try_backfill_voters();
}
/// Finds the highest log index that has been replicated to a quorum of voters
/// in the current voting committee.
pub fn highest_quorum_index(&self) -> Index {
let mut log_indices = self.voters.values().collect::<Vec<_>>();
log_indices.sort_unstable();
let quorum = (self.voters.len() / 2) + 1;
*log_indices[log_indices.len() - quorum]
}
/// We want to keep the voting committee capped at `self.max_committee_size`
/// voters to ensure we can achieve quorum with a reasonable latency, we also
/// want to have an odd number of voters to avoid split votes.
fn target_committee_size(&self) -> usize {
let total_followers = self.voters.len() + self.non_voters.len();
let capped = self.max_committee_size.min(total_followers);
if capped <= 2 { capped } else { capped | 1 }
}
/// Attempts to backfill voters from the non-voters if there is room in the
/// committee based on the target committee size.
fn try_backfill_voters(&mut self) {
// check if we should promote any non-voters to voters based on their log
// progress and the current size of the voting committee.
let target_committee_size = self.target_committee_size();
if self.voters.len() < target_committee_size {
// promote the most caught-up non-voters to fill the committee, but
// ensure the resulting voter count is always odd to avoid split votes.
let mut non_voters = self.non_voters.iter().collect::<Vec<_>>();
let deficit = target_committee_size - self.voters.len();
let available = non_voters.len().min(deficit);
// if promoting `available` non-voters would result in an even voter
// count, promote one fewer to keep it odd.
let promote_count = if (self.voters.len() + available).is_multiple_of(2) {
available.saturating_sub(1)
} else {
available
};
// sort non-voters by their log index in descending order to promote the
// most caught-up ones first.
non_voters.sort_by_key(|(_, log_index)| Reverse(*log_index));
let promoted: Vec<_> = non_voters
.into_iter()
.take(promote_count)
.map(|(id, idx)| (*id, *idx))
.collect();
for (non_voter, log_index) in promoted {
self.non_voters.remove(&non_voter);
self.voters.insert(non_voter, log_index);
}
}
}
/// Emits metrics about the current committee state.
fn emit_metrics(
&self,
network_id: &crate::NetworkId,
group_id: &crate::groups::GroupId,
) {
let labels = [
("network", network_id.short().to_string()),
("group", group_id.short().to_string()),
];
metrics::gauge!("mosaik.groups.raft.committee.voters", &labels)
.set(self.voters.len() as f64);
metrics::gauge!("mosaik.groups.raft.committee.non_voters", &labels)
.set(self.non_voters.len() as f64);
metrics::gauge!("mosaik.groups.raft.committee.size", &labels)
.set((self.voters.len() + self.non_voters.len()) as f64);
}
}