1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use actix::prelude::*;
use crate::{
AppData, AppDataResponse, AppError, NodeId,
common::{DependencyAddr, UpdateCurrentLeader},
messages::{VoteRequest, VoteResponse},
network::RaftNetwork,
raft::{RaftState, Raft},
storage::RaftStorage,
};
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<VoteRequest> for Raft<D, R, E, N, S> {
type Result = ResponseActFuture<Self, VoteResponse, ()>;
fn handle(&mut self, msg: VoteRequest, ctx: &mut Self::Context) -> Self::Result {
if let &RaftState::Initializing = &self.state {
return Box::new(fut::err(()));
}
Box::new(fut::result(self.handle_vote_request(ctx, msg)))
}
}
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Raft<D, R, E, N, S> {
fn handle_vote_request(&mut self, ctx: &mut Context<Self>, msg: VoteRequest) -> Result<VoteResponse, ()> {
if !self.membership.contains(&msg.candidate_id) {
return Ok(VoteResponse{term: self.current_term, vote_granted: false, is_candidate_unknown: true});
}
if &msg.term < &self.current_term {
return Ok(VoteResponse{term: self.current_term, vote_granted: false, is_candidate_unknown: false});
}
if &msg.term > &self.current_term {
self.update_current_term(msg.term, None);
self.save_hard_state(ctx);
}
let client_is_uptodate = (&msg.last_log_term >= &self.last_log_term) && (&msg.last_log_index >= &self.last_log_index);
if !client_is_uptodate {
return Ok(VoteResponse{term: self.current_term, vote_granted: false, is_candidate_unknown: false});
}
match &self.voted_for {
Some(candidate_id) if candidate_id == &msg.candidate_id => {
Ok(VoteResponse{term: self.current_term, vote_granted: true, is_candidate_unknown: false})
}
Some(_) => Ok(VoteResponse{term: self.current_term, vote_granted: false, is_candidate_unknown: false}),
None => {
self.voted_for = Some(msg.candidate_id);
self.save_hard_state(ctx);
self.update_election_timeout_stamp();
self.become_follower(ctx);
Ok(VoteResponse{term: self.current_term, vote_granted: true, is_candidate_unknown: false})
},
}
}
pub(super) fn request_vote(&mut self, _: &mut Context<Self>, target: NodeId) -> impl ActorFuture<Actor=Self, Item=(), Error=()> {
let rpc = VoteRequest::new(target, self.current_term, self.id, self.last_log_index, self.last_log_term);
fut::wrap_future(self.network.send(rpc))
.map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftNetwork))
.and_then(|res, _, _| fut::result(res))
.and_then(move |res, act, ctx| {
let state = match &mut act.state {
RaftState::Candidate(state) => state,
_ => {
return fut::ok(());
}
};
if res.is_candidate_unknown && act.last_log_index > 0 {
act.become_non_voter(ctx);
return fut::ok(());
}
if res.term > act.current_term {
act.update_current_term(res.term, None);
act.update_current_leader(ctx, UpdateCurrentLeader::Unknown);
act.become_follower(ctx);
act.save_hard_state(ctx);
return fut::ok(());
}
if res.vote_granted {
state.votes_granted += 1;
if state.votes_granted >= state.votes_needed {
act.become_leader(ctx);
}
}
fut::ok(())
})
}
}