use dcs::communication::messages::Package;
use log::*;
use dcs::communication::service::CommunicationService;
use crate::server::*;
impl<T: Stopwatch, L: LogData> CandidateBehavior<T, L> for RaftService<T, L> {
fn parse_message(
&mut self,
package: RaftPackage<L>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let Package { header, body } = package;
match body {
RaftMessage::RequestVote(_req) => {
if _req.term > self.term {
self.io.set_follower_timeout();
MemberState::Follower
} else {
MemberState::Candidate
}
}
RaftMessage::AppendLog(req) => {
<dyn candidate_behaviour::Candidate<T, L>>::handle_append_log(
self,
header,
req,
communication_service,
)
}
RaftMessage::ReadRequest => {
self.reject_read_request(header, communication_service);
self.current_role
}
RaftMessage::RequestVoteResponse(res) => {
self.update_ttl(&header);
<dyn candidate_behaviour::Candidate<T, L>>::handle_vote_response(
self,
header,
res,
communication_service,
)
}
m => {
warn!("Didn't expect {:?}", m);
self.current_role
}
}
}
fn after_tick(&mut self, communication_service: &mut dyn CommunicationService<RaftPackage<L>>) {
if self.io.timer.is_timeout() {
info!("Node #{} reached timeout.", self.id);
let im_the_only_member = self.cluster.is_empty();
if im_the_only_member {
<dyn candidate_behaviour::Candidate<T, L>>::become_leader(
self,
communication_service,
);
self.current_role = MemberState::Leader;
} else {
self.start_election(communication_service)
}
}
}
}
mod candidate_behaviour {
use dcs::communication::messages::{Header, PackageBuilder};
use dcs::communication::service::CommunicationService;
use crate::server::*;
use crate::RaftMessage::AppendLogResponse;
pub trait Candidate<T, L> {}
impl<T: Stopwatch, L: LogData> dyn Candidate<T, L> {
pub fn handle_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
request: AppendLogArgs<L>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
if !server.cluster.contains_key(&header.from) {
return server.current_role;
}
if request.term >= server.term {
Self::stop_election_and_become_follower(
server,
header,
request,
communication_service,
)
} else {
Self::reject_append_log_and_continue_election(server, header, communication_service)
}
}
fn stop_election_and_become_follower(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
request: AppendLogArgs<L>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
info!("Becoming follower of #{}.", header.from);
let leader_id = header.from.into();
let new_term = request.term;
server.become_follower_of(leader_id, new_term);
Self::reject_append_log(server, header, communication_service);
MemberState::Follower
}
fn reject_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let reject_append_log = AppendLogResponse(AppendLogResponseResult {
term: server.term,
success: false,
});
let package = package(reject_append_log)
.from(server.id)
.to(header.from)
.build()
.unwrap();
communication_service.push(package);
}
fn reject_append_log_and_continue_election(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
Self::reject_append_log(server, header, communication_service);
MemberState::Candidate
}
pub fn handle_vote_response(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
RequestVoteResponseResult { granted, .. }: RequestVoteResponseResult,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
if !server.cluster.contains_key(&header.from) {
return server.current_role;
}
Self::update_election_count_with_new_vote(server, &header, granted);
Self::check_election_result(server, communication_service)
}
fn update_election_count_with_new_vote(
server: &mut RaftService<T, L>,
header: &Header<SystemNodeId>,
granted: bool,
) {
let member = server.cluster.get_mut(&header.from);
let vote = if granted {
ElectionVote::Granted
} else {
ElectionVote::Against
};
if let Some(m) = member {
m.vote_granted = vote
}
}
fn check_election_result(
server: &mut RaftService<T, L>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let (granted, against, _total) = Self::compute_total_votes(server);
if Self::election_won(granted, against) {
Self::become_leader(server, communication_service);
MemberState::Leader
} else {
MemberState::Candidate
}
}
pub fn become_leader(
server: &mut RaftService<T, L>,
communication_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
info!("Becoming leader.");
debug!("[RAFT] Node #{} becoming leader.", server.id);
server.clean_state_from_previous_election();
server.reset_next_index();
server.reset_match_index();
server.commit_noop(communication_service);
server.io.set_heartbeat_timeout();
server.leader_id = Some(server.id)
}
fn election_won(granted: i32, total: i32) -> bool {
granted > total / 2
}
fn compute_total_votes(server: &mut RaftService<T, L>) -> (i32, i32, i32) {
let (granted, against, abstained) = server
.cluster
.values()
.map(|member| match member.vote_granted {
ElectionVote::Granted => (1, 0, 0),
ElectionVote::Against => (0, 1, 0),
ElectionVote::Abstained => (0, 0, 1),
})
.reduce(|accum, vote| (accum.0 + vote.0, accum.1 + vote.1, accum.2 + vote.2))
.unwrap();
let total = granted + against + abstained;
(granted, against, total)
}
}
}
#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
extern crate alloc;
extern crate std;
use crate::assert_message;
use crate::messages::RaftMessage::AppendLogResponse;
use crate::messages::RaftMessage::RequestVote;
use crate::messages::*;
use crate::server::test_server_builder::ServerBuilder;
use crate::server::test_utils::*;
use core::fmt::Debug;
use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
use dcs::communication::messages::{Header, PackageBuilder};
use super::*;
#[test]
fn given_follower__when_timeout__then_send_request_vote_msg() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, _out_queue_test, mut server, _cluster, communication_service) =
builder.server_in_cluster();
timeout_follower(&mut server, communication_service);
server.tick(communication_service);
let actual_msg = read_msg_sent_by_server(&mut in_queue_test);
assert_message_is_vote_request(&actual_msg);
}
#[test]
fn given_candidate__when_starting_election__then_sends_vote_req_to_all_servers_in_cluster() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, _out_queue_test, mut server, cluster, communication_service) =
builder.server_in_cluster();
timeout_follower(&mut server, communication_service);
server.tick(communication_service);
let member_count = cluster.len();
for _ in 0..member_count - 1 {
let package = in_queue_test.pop().unwrap();
assert_message_is_vote_request(package.get_message());
assert!(cluster.contains_key(&cluster_id(package)));
}
}
fn cluster_id(package: RaftPackage<TestState>) -> SystemNodeId {
package.get_receiver()
}
#[test]
fn given_timer_is_up__when_server_ticks__then_increments_current_term() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, _out_queue_test, mut server, _cluster, communication_service) =
builder.server_in_cluster();
timeout_follower(&mut server, communication_service);
server.tick(communication_service);
let actual_msg = read_msg_sent_by_server(&mut in_queue_test);
assert_message! {
term in actual_msg == 1
}
}
#[test]
fn given_candidate__when_receives_appendlog__and_appendlog_term_is_GE_than_current_term__then_returns_to_follower_with_discovered_term(
) {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, communication_service) =
builder.candidate();
send_append_log(&mut out_queue_test, 2);
server.tick(communication_service);
let actual_msg = read_msg_sent_by_server(&mut in_queue_test);
match actual_msg {
AppendLogResponse(AppendLogResponseResult { term, .. }) => assert_eq!(2, term),
_ => fail(),
}
}
fn send_append_log<T: LogData + Debug + Debug + Default>(
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
term: Term,
) {
send_append_log_with_header(
out_queue_test,
term,
Header {
from: 1.into(),
to: 2.into(),
},
)
}
#[test]
fn given_candidate__when_receives_appendlog__and_term_is_smaller_than_current_term__then_rejects_message(
) {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, communication_service) =
builder.candidate();
send_append_log(&mut out_queue_test, 0);
server.tick(communication_service);
let appendlog_response = read_msg_sent_by_server(&mut in_queue_test);
match appendlog_response {
AppendLogResponse(AppendLogResponseResult { success, .. }) => assert!(!success),
_ => fail(),
}
}
#[test]
fn given_candidate__when_times_out__then_start_new_election() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, _out_queue_test, mut server, _cluster, communication_service) =
builder.candidate();
timeout_candidate(&mut server, communication_service);
let actual_msg = read_msg_sent_by_server(&mut in_queue_test);
assert_message! {
term in actual_msg == 2
}
}
#[test]
fn given_follower__when_appendlog_is_received__then_response_goes_to_requester() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, communication_service) =
builder.server_in_cluster();
send_append_log_with_header(
&mut out_queue_test,
0,
Header {
from: 1.into(),
to: 0.into(),
},
);
server.tick(communication_service);
assert_eq!(SystemNodeId::from(1), get_destination_of_msg(&mut in_queue_test))
}
fn send_append_log_with_header<T: LogData + Debug + Debug + Debug + Default>(
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
term: Term,
header: Header<SystemNodeId>,
) {
let hearbeat = AppendLog(AppendLogArgs {
term,
prev_log_index: 0,
prev_log_term: 0,
entries: crate::messages::Log::new(),
leader_commit: 0,
});
let heartbeat = package(hearbeat)
.from(header.from)
.to(header.to)
.build()
.unwrap();
out_queue_test.push(heartbeat);
}
#[test]
fn given_candidate__when_retrieves_votes_from_majority__then_sends_heartbeat_to_all() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, cluster, communication_service) =
builder.candidate();
let server_id = server.id;
for member in cluster.values().filter(|member| member.id != server_id) {
let grant_vote = RaftMessage::RequestVoteResponse(RequestVoteResponseResult {
term: 1,
granted: true,
});
let msg = package(grant_vote)
.from(member.id)
.to(server.id)
.build()
.unwrap();
out_queue_test.push(msg);
server.tick(communication_service);
}
let package = in_queue_test.pop().unwrap();
match package.get_message() {
AppendLog(AppendLogArgs { term, .. }) => assert_eq!(1, *term),
_ => fail(),
}
}
}