use dcs::communication::messages::Package;
use dcs::communication::service::CommunicationService;
use dcs::coordination::Stopwatch;
use crate::messages::{RaftMessage};
use crate::server::MemberState::Follower;
use crate::server::{FollowerBehavior, LogData, RaftPackage, RaftService, MemberState};
use log::*;
impl<T: Stopwatch, L: LogData> FollowerBehavior<T, L> for RaftService<T, L> {
fn parse_message(
&mut self,
package: RaftPackage<L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let Package { header, body } = package;
match body {
RaftMessage::RequestVote(req) => {
<dyn follower_behaviour::Follower<T, L>>::handle_request_vote(
self,
header,
req,
comm_service,
)
}
RaftMessage::AppendLog(req) => {
<dyn follower_behaviour::Follower<T, L>>::handle_append_log(
self,
header,
req,
comm_service,
)
}
RaftMessage::ReadRequest => self.reject_read_request(header, comm_service),
RaftMessage::WriteRequestReply(_, _) => {}
RaftMessage::WriteRequest(_args) => self.reject_write_request(header, comm_service),
RaftMessage::InstallSnapshot(args) => {
<dyn follower_behaviour::Follower<T, L>>::handle_install_snapshot(
self,
header,
args,
comm_service,
)
}
msg => log::warn!("Unexpected message received as Follower: {:?}", msg),
}
Follower
}
fn after_tick(&mut self, comm_service: &mut dyn CommunicationService<RaftPackage<L>>) {
let last_config = self
.log
.iter()
.filter_map(|entry| entry.config_change.as_ref())
.last();
let current_config = self.current_config();
if last_config.is_some() && last_config.unwrap() != ¤t_config {
let last = last_config.cloned().unwrap();
self.update_config(&last);
}
if self.io.timer.is_timeout() {
self.start_election(comm_service)
}
}
}
mod follower_behaviour {
use std::cmp::min;
use dcs::communication::messages::{Header, PackageBuilder};
use dcs::communication::service::CommunicationService;
use log::{debug, info, trace};
use dcs::coordination::Stopwatch;
use dcs::nodes::SystemNodeId;
use crate::messages::RaftMessage::{AppendLogResponse, RequestVoteResponse};
use crate::messages::*;
use crate::server::ElectionVote::{Abstained, Granted};
use crate::server::{package, LogData, RaftPackage, RaftService};
pub trait Follower<T, L> {}
impl<T: Stopwatch, L: LogData> dyn Follower<T, L> {
pub fn handle_install_snapshot(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
args: InstallSnapshotArgs<L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
if server.term <= args.term {
server
.log
.install_snapshot(args.last_included_index, args.data.clone());
server.leader_id = Some(args.leader_id);
}
let msg = RaftMessage::InstallSnapshotResponse(server.term);
let pkg = package(msg)
.from(server.id.into())
.to(header.from)
.build()
.unwrap();
comm_service.push(pkg);
}
pub fn handle_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
args: AppendLogArgs<L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
if !server.cluster.contains_key(&header.from) {
debug!("Ignoring msg {:?} from {:?}", args, header.from);
return;
}
if Self::sender_log_out_of_date(&server.term, &args.term) {
debug!("Rejecting append log request: sender is out of date.");
Self::reject_append_log(server, header, comm_service);
} else if Self::log_is_inconsistent(&args.prev_log_index, &server.log, &args.prev_log_term) {
debug!("Rejecting append log request: log is inconsistent.");
Self::remove_entry_and_all_after(&args.prev_log_index, &mut server.log);
Self::reject_append_log(server, header, comm_service);
} else {
debug!("Accepting append log request.");
server.io.set_follower_timeout();
server.clean_state_from_previous_election();
if let Some(leader_id) = server.leader_id {
if leader_id != header.from {
info!("Becoming follower of node #{}.", header.from)
}
} else if server.leader_id.is_none() {
info!("Becoming follower of node #{}.", header.from)
}
server.leader_id = Some(header.from.into());
if !args.entries.is_empty() {
Self::append_entries(&mut server.log, &args);
}
server.commit_index = min(args.leader_commit, server.log.last_index() as LogIndex);
Self::accept_append_log(server, header, comm_service);
}
}
fn sender_log_out_of_date(server_term: &Term, sender_term: &Term) -> bool {
sender_term < server_term
}
fn accept_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
Self::answer_append_log(server, header, true, comm_service)
}
fn answer_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
success: bool,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let answer = AppendLogResponse(AppendLogResponseResult { term: server.term, success});
comm_service.push(server.build_message(answer, header.from).unwrap())
}
fn append_entries(log: &mut Log<L>, args: &AppendLogArgs<L>) {
let starting_index = args.prev_log_index;
debug!("Appending entries into log, starting at position {}.", starting_index);
trace!("New entries: {:?}", args.entries);
trace!("Log: {:?}", log);
for i in 1..args.entries.len() + 1 {
let current_position = starting_index as usize + i;
log.insert(
current_position as LogIndex,
args.entries.get(i as LogIndex).unwrap().clone(),
);
}
trace!("Updated log: {:?}", log);
}
fn remove_entry_and_all_after(idx: &LogIndex, log: &mut Log<L>) {
let elements_to_remove = log.len() as i32 - *idx as i32;
for _ in 0..elements_to_remove {
log.pop();
}
}
fn log_is_inconsistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
!Self::log_is_consistent(prev_log_index, logs, term)
}
fn log_is_consistent(prev_log_index: &LogIndex, logs: &Log<L>, term: &Term) -> bool {
trace!("prev log idx {:?} term {:?}", prev_log_index, term);
if *prev_log_index == 0 {
return true;
}
trace!(
"element at prev_log_idx - 1 {:?}",
logs.get(*prev_log_index)
);
trace!("log: {:?}", logs);
logs.get(*prev_log_index)
.map(|entry| entry.term == *term)
.unwrap_or_else(|| false)
}
fn reject_append_log(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
Self::answer_append_log(server, header, false, comm_service)
}
pub fn handle_request_vote(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
args: RequestVoteArgs,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
if !server.cluster.contains_key(&header.from) {
debug!("Ignoring msg {:?} from {:?}", args, header.from);
return;
}
let candidate_log_out_of_date = Self::candidate_log_is_out_of_date(
&args.prev_log_index,
&server.log,
&args.prev_log_term,
);
if Self::vote_granted_to_other_candidate(server) || candidate_log_out_of_date {
debug!(
"Rejecting vote request. {}",
if candidate_log_out_of_date {
"Log out of date"
} else {
"Granted to another candidate"
}
);
Self::reject_vote_request(server, header, comm_service);
} else {
debug!("Granting vote request");
server.io.set_follower_timeout();
Self::update_term(server, args.term);
Self::grant_vote_request(server, header, comm_service);
}
}
fn vote_granted_to_other_candidate(server: &mut RaftService<T, L>) -> bool {
debug!("vote_granted_to_other_candidate {:?}", &server.cluster);
server
.cluster
.values()
.any(|member| member.vote_granted == Granted)
}
fn update_term(server: &mut RaftService<T, L>, term: Term) {
if server.term < term {
server.term = term;
}
}
fn grant_vote_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let answer = RequestVoteResponse(RequestVoteResponseResult {
term: server.term,
granted: true,
});
if let Some(member) = server.cluster.get_mut(&header.from) {
member.vote_granted = Granted
}
comm_service.push(server.build_message(answer, header.from).unwrap())
}
fn candidate_log_is_out_of_date(
prev_log_index: &LogIndex,
log: &Log<L>,
prev_log_term: &Term,
) -> bool {
let follower_last_entry_has_greater_term = log
.last()
.map(|entry| entry.term > *prev_log_term)
.unwrap_or(false);
let terms_are_equal = log
.last()
.map(|entry| entry.term == *prev_log_term)
.unwrap_or(false);
let follower_log_is_longer = log.len() > *prev_log_index as usize;
log::debug!(
"{} {} {}",
follower_last_entry_has_greater_term,
terms_are_equal,
follower_log_is_longer
);
follower_last_entry_has_greater_term || (terms_are_equal && follower_log_is_longer)
}
fn reject_vote_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let answer = RequestVoteResponse(RequestVoteResponseResult {
term: server.term,
granted: false,
});
comm_service.push(server.build_message(answer, header.from).unwrap())
}
}
}
#[allow(non_snake_case)]
#[cfg(test)]
mod given_follower {
use core::fmt::Debug;
use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
use dcs::communication::messages::PackageBuilder;
use dcs::communication::service::CommunicationService;
use dcs::nodes::SystemNodeId;
use crate::messages::RaftMessage::{
AppendLog, AppendLogResponse, RequestVote, RequestVoteResponse,
};
use crate::messages::RaftMessage::{InstallSnapshot, WriteRequest};
use crate::messages::*;
use crate::messages::{InstallSnapshotArgs, LogEntry, LogIndex, RaftMessage};
use crate::server::follower::test_utils::FollowerContext;
use crate::server::test_server_builder::ContextBuilder;
use crate::server::test_server_builder::ServerBuilder;
use crate::server::test_utils::{fail, FakeTimer, TestState};
use crate::server::{package, LogData, RaftPackage, RaftService};
use dcs::rules::measurements::Measurement;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
pub fn some_entry() -> Option<TestState> {
Some(TestState(1, 1))
}
mod receives_append_entry {
use super::*;
use crate::server::follower::test_utils::FollowerContext;
use crate::server::test_server_builder::ContextBuilder;
use crate::server::test_utils::almost_timeout_follower;
#[test]
fn duplicated_entries_are_ignored() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
send_append_log(&mut out_queue_test, 0, 0, some_entry());
send_append_log(&mut out_queue_test, 0, 0, some_entry());
server.tick(comm_service);
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(append_log_is_successful(msg));
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(append_log_is_successful(msg));
assert_eq!(some_entry(), server.log.get(1).unwrap().data);
assert_eq!(None, server.log.get(2));
}
#[test]
fn from_leader__then_resets_timer() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
almost_timeout_follower(&mut server, comm_service);
send_append_log(&mut out_queue_test, 0, 0, some_entry());
server.tick(comm_service);
assert!(follower_didnt_start_election(&mut in_queue_test))
}
#[test]
fn if_invalid_then_dont_update_commit_idx() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
context.send_append_log_and_tick(vec![(None, 1)], 2, 0, 32);
let server = context.inner.get_node();
assert_eq!(server.commit_index, 0)
}
#[test]
fn with_multiple_entries_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
let entries = vec![
(None, prev_log_term),
(None, prev_log_term),
(None, prev_log_term),
];
context.send_append_log_and_tick(entries, 0, prev_log_term, 32);
let server = context.inner.get_node();
assert_eq!(server.commit_index, 3)
}
#[test]
fn with_single_entry_if_valid_then_updates_commit_index_with_last_new_entry_idx() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
let server = context.inner.get_node();
assert_eq!(server.commit_index, 1)
}
#[test]
fn if_valid_then_updates_commit_index_with_leader_commit_idx() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
context.send_append_log_and_tick(vec![(None, prev_log_term)], 0, prev_log_term, 32);
context.send_append_log_and_tick(vec![(None, prev_log_term)], 1, prev_log_term, 32);
context.send_append_log_and_tick(vec![(None, prev_log_term)], 2, prev_log_term, 32);
context.send_append_log_and_tick(vec![(None, prev_log_term)], 3, prev_log_term, 1);
let server = context.inner.get_node();
assert_eq!(server.commit_index, 1)
}
mod with_empty_log {
use super::*;
#[test]
fn and_entry_has_greater_nextIndex__then_request_fails() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
send_append_log(&mut out_queue_test, 2, 0, some_entry());
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(append_log_is_not_successful(msg));
}
#[test]
fn and_entry_has_correct_nextIndex_and_term_match__then__request_succeeds() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
send_append_log(&mut out_queue_test, 0, 0, some_entry());
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(append_log_is_successful(msg));
}
}
mod with_nonempty_log {
use super::*;
#[test]
fn when_receives_append_entry_with_correct_nextIndex_but_term_doesnt_match__then_request_fails(
) {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_append_log(&mut out_queue_test, 1, 99, some_entry());
server.tick(comm_service);
if let AppendLogResponse(AppendLogResponseResult { success, .. }) =
in_queue_test.pop().unwrap().get_message()
{
assert!(!success)
} else {
fail()
}
}
#[test]
fn when_receives_append_entry_with_smaller_nextIndex__then_request_succeeds_and_log_is_updated(
) {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entries_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
vec![some_entry(), some_other_entry()],
);
send_append_log(&mut out_queue_test, 1, 1, some_entry());
server.tick(comm_service);
assert!(append_log_is_successful(
in_queue_test.pop().unwrap().get_message()
));
assert_eq!(some_entry(), server.log.get(1).unwrap().data);
assert_eq!(some_entry(), server.log.get(2).unwrap().data);
}
#[test]
fn when_receives_append_entry_with_correct_nextIndex_and_term__then_request_succeeds() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_append_log(&mut out_queue_test, 1, 1, some_entry());
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(append_log_is_successful(msg));
}
}
mod receives_new_config {
use super::*;
#[test]
fn applies_it_immediately() {
let mut builder = ContextBuilder::new();
let mut context = builder.follower();
let mut new_config = context.current_config();
new_config.remove(2);
let entry =
LogEntry::<TestState>::with_config(context.term(), Some(new_config.clone()));
let mut log = Log::new();
log.push(entry);
let args = AppendLogArgs {
term: 0,
prev_log_index: 0,
prev_log_term: 0,
entries: log,
leader_commit: 0,
};
context.empty_queue();
context.send_and_tick(context.package(AppendLog(args)));
assert_eq!(
AppendLogResponse(AppendLogResponseResult {
term: 0,
success: true
}),
context.recv_message().unwrap()
);
assert_eq!(2, context.cluster_size());
}
}
}
mod receives_vote_request {
use super::*;
use crate::server::test_utils::almost_timeout_follower;
#[test]
fn at_startup_when_granting_vote_request_then_resets_timer() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.server_in_cluster();
almost_timeout_follower(&mut server, comm_service);
send_request_vote(&mut out_queue_test, 0, 0, 1, 1);
server.tick(comm_service);
assert!(follower_didnt_start_election(&mut in_queue_test))
}
#[test]
fn from_candidate_with_smaller_last_term_in_log__then_request_is_rejected() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(request_vote_not_granted(msg));
}
#[test]
fn from_candidate_with_same_last_term_and_shorter_log__then_request_is_rejected() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_request_vote(&mut out_queue_test, 0, 0, 0, 0);
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(request_vote_not_granted(msg));
}
mod with_nonempty_log {
use super::*;
#[test]
fn candidate_has_valid_log__and_voted_granted_to_no_other__then_vote_is_granted() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_request_vote(&mut out_queue_test, 2, 1, 1, 0);
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(request_vote_granted(msg));
}
#[test]
fn candidate_has_valid_log__and_voted_granted_to_other__then_vote_is_denied() {
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
add_log_entry_to_server(
&mut server,
&mut out_queue_test,
&mut in_queue_test,
comm_service,
some_entry(),
);
send_request_vote(&mut out_queue_test, 2, 0, 0, 0);
server.tick(comm_service);
in_queue_test.pop().unwrap();
send_request_vote(&mut out_queue_test, 2, 0, 0, 2);
server.tick(comm_service);
let package = in_queue_test.pop().unwrap();
let msg = package.get_message();
assert!(request_vote_not_granted(msg));
}
}
}
mod receives_client_interaction {
use crate::messages::RaftMessage::{ReadRequestReply, WriteRequestReply};
use crate::messages::{ReadRequestReplyArgs};
use crate::server::test_server_builder::ServerBuilder;
use crate::server::test_utils::{send_read_request, send_write_request};
use dcs::communication::connection::InMsgQueue;
use dcs::nodes::SystemNodeId;
use dcs::rules::measurements::Measurement;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
#[test]
fn receives_write_request__then_redirects_to_leader() {
let leader_id = SystemNodeId::from(0);
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
send_write_request(
&mut server,
&mut out_queue_test,
Measurement::new(TEMPERATURE, 0),
comm_service,
);
let answer = in_queue_test.pop().unwrap();
if WriteRequestReply(false, Some(leader_id)) != answer.body {
unreachable!("failed")
}
}
#[test]
fn receives_read_request__then_redirects_to_leader() {
let leader_id = SystemNodeId::from(0);
let mut builder = ServerBuilder::new();
let (mut in_queue_test, mut out_queue_test, mut server, _cluster, comm_service) =
builder.follower();
send_read_request(&mut server, &mut out_queue_test, comm_service);
let answer = in_queue_test.pop().unwrap();
if ReadRequestReply(ReadRequestReplyArgs {
success: false,
data: None,
redirect: Some(leader_id),
}) != answer.body
{
unreachable!("failed")
}
}
}
mod snapshots {
use super::*;
#[test]
pub fn snapshot_occurs_at_75_percent_capacity() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let (pre_snapshot_capacity, post_snapshot_capacity) =
context.trigger_snapshot_at_75_percent_occupation();
assert!(pre_snapshot_capacity < post_snapshot_capacity);
}
#[test]
pub fn can_receive_append_log_after_snapshot() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
context.trigger_snapshot_at_75_percent_occupation();
context.inner.empty_queue();
context.send_append_log_and_tick(vec![(Some(TestState(3, 4)), 0)], 29, 1, u32::MAX);
let msg = context.inner.recv_message().unwrap();
assert!(append_log_is_successful(&msg));
let log = context.inner.get_node().log.clone();
assert_eq!(TestState(3, 4), log.last().unwrap().data.unwrap());
}
}
pub mod given_install_snapshot {
use dcs::nodes::SystemNodeId;
use super::*;
#[test]
pub fn updates_the_log() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let snapshot_entry = context.install_snapshot().data;
assert_eq!(snapshot_entry, context.last_log_entry())
}
#[test]
pub fn updates_leader_id() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
context.install_snapshot();
context.inner.empty_queue();
context.inner.send_and_tick(
context
.inner
.package(WriteRequest(WriteRequestArgs::with_measurement(4.into(), Measurement::new(TEMPERATURE, 1)))),
);
let msg = context.inner.recv_message().unwrap();
let id = Some(SystemNodeId::from(1));
assert!(
matches!(msg, RaftMessage::WriteRequestReply(false, id)),
"Did not expect message {msg:?}"
);
}
#[test]
pub fn replaces_up_to_last_included_entry_with_snapshot() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
context.send_append_log_and_tick(
vec![(Some(TestState(1, 1)), prev_log_term)],
0,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(2, 2)), prev_log_term)],
1,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(3, 3)), prev_log_term)],
2,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(4, 4)), prev_log_term)],
3,
prev_log_term,
32,
);
let log_before_snapshot = context.log();
let install_snapshot = context.install_snapshot();
let log_after_snapshot = context.log();
let len_after_entries_removed =
log_before_snapshot.len() - install_snapshot.last_included_index as usize;
let snapshot_entries_added = 1;
assert_eq!(
len_after_entries_removed + snapshot_entries_added,
log_after_snapshot.len()
)
}
#[test]
pub fn if_term_is_less_than_current_term_responds_immediately_and_dont_update() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
context.send_append_log_and_tick(
vec![(Some(TestState(1, 1)), prev_log_term)],
0,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(2, 2)), prev_log_term)],
1,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(3, 3)), prev_log_term)],
2,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(4, 4)), prev_log_term)],
3,
prev_log_term,
32,
);
context.inner.empty_queue();
let log_before = context.log();
context.set_term(15);
let install_snapshot = context.install_snapshot();
let log_after = context.log();
let message = context.inner.recv_message().unwrap();
assert!(matches!(message, RaftMessage::InstallSnapshotResponse(15)));
assert_eq!(log_before, log_after);
}
#[test]
pub fn responds_with_current_term() {
let mut builder = ContextBuilder::new();
let mut context: FollowerContext = builder.follower().into();
let prev_log_term = 1;
context.send_append_log_and_tick(
vec![(Some(TestState(1, 1)), prev_log_term)],
0,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(2, 2)), prev_log_term)],
1,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(3, 3)), prev_log_term)],
2,
prev_log_term,
32,
);
context.send_append_log_and_tick(
vec![(Some(TestState(4, 4)), prev_log_term)],
3,
prev_log_term,
32,
);
context.inner.empty_queue();
context.set_term(5);
let install_snapshot = context.install_snapshot();
let message = context.inner.recv_message().unwrap();
assert!(
matches!(message, RaftMessage::InstallSnapshotResponse(5)),
"Wasn't expecting {message:?}"
);
}
}
pub fn some_other_entry() -> Option<TestState> {
Some(TestState(37, 12))
}
fn append_log_is_successful(msg: &RaftMessage<TestState>) -> bool {
matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if *success)
}
fn append_log_is_not_successful(msg: &RaftMessage<TestState>) -> bool {
matches!(msg, AppendLogResponse(AppendLogResponseResult { success, .. }) if !success)
}
fn send_append_log<T: LogData + Debug>(
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
prev_log_index: LogIndex,
prev_log_term: Term,
data: Option<T>,
) {
let mut entries: Log<T> = Default::default();
let _ = entries.push(LogEntry::with_data(prev_log_term, data));
let append_log = AppendLog(AppendLogArgs {
term: 1,
prev_log_index,
prev_log_term,
entries,
leader_commit: 0,
});
let append_log_req = package(append_log)
.from(0.into())
.to(1.into())
.build()
.unwrap();
out_queue_test.push(append_log_req);
}
fn add_log_entries_to_server<L: LogData + Debug>(
server: &mut RaftService<FakeTimer, L>,
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
entry: Vec<Option<L>>,
) {
for (idx, e) in entry.into_iter().enumerate() {
send_append_log(out_queue_test, idx as LogIndex, 1, e);
server.tick(comm_service);
in_queue_test.pop().unwrap();
}
}
fn add_log_entry_to_server<L: LogData + Debug>(
server: &mut RaftService<FakeTimer, L>,
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<L>>,
in_queue_test: &mut dyn InMsgQueue<RaftPackage<L>>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
entry: Option<L>,
) {
send_append_log(out_queue_test, 0, 1, entry);
server.tick(comm_service);
in_queue_test.pop().unwrap();
}
fn request_vote_granted(msg: &RaftMessage<TestState>) -> bool {
matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if *granted)
}
fn request_vote_not_granted(msg: &RaftMessage<TestState>) -> bool {
matches!(msg, RequestVoteResponse(RequestVoteResponseResult { granted, .. }) if !granted)
}
fn send_request_vote<T: LogData + Debug>(
out_queue_test: &mut dyn OutMsgQueue<RaftPackage<T>>,
prev_log_index: LogIndex,
prev_log_term: Term,
term: Term,
from_id: u32,
) {
let request_vote = RequestVote(RequestVoteArgs {
term,
prev_log_index,
prev_log_term,
});
let append_log_req = package(request_vote)
.from(SystemNodeId::from(from_id))
.to(1.into())
.build()
.unwrap();
out_queue_test.push(append_log_req);
}
pub fn follower_didnt_start_election(
comm_service: &mut dyn InMsgQueue<RaftPackage<TestState>>,
) -> bool {
let append_log_response =
RaftMessage::<TestState>::AppendLogResponse(AppendLogResponseResult {
term: 0,
success: true,
});
let actual_append_log_response = comm_service.pop().unwrap().body;
matches!(append_log_response, actual_append_log_response) && comm_service.pop().is_none()
}
}
#[cfg(test)]
mod test_utils {
use dcs::nodes::SystemNodeId;
use crate::messages::RaftMessage::{AppendLog, InstallSnapshot};
use crate::messages::{
AppendLogArgs, InstallSnapshotArgs, Log, LogEntry, LogIndex, Term, LOG_LEN,
};
use crate::server::leader::test_utils::LeaderContext;
use crate::server::test_server_builder::TestContext;
use crate::server::test_utils::TestState;
use dcs::rules::measurements::Measurement;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
pub struct FollowerContext<'a> {
pub inner: TestContext<'a>,
}
impl<'a> FollowerContext<'a> {
pub fn set_term(&mut self, term: Term) {
self.inner.get_node_mut().term = term;
}
pub(crate) fn trigger_snapshot_at_75_percent_occupation(&mut self) -> (f32, f32) {
let count_to_fill_log = LOG_LEN as f32 * 0.74;
for i in 0..count_to_fill_log.floor() as u32 {
self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
}
let pre_snapshot_capacity = self.inner.get_node().log.capacity();
let count_to_snapshot = count_to_fill_log * 1.1;
for i in count_to_fill_log as u32..count_to_snapshot.ceil() as u32 {
self.send_append_log_and_tick(vec![(Some(TestState(2, 2)), 1)], i, 1, u32::MAX);
}
let post_snapshot_capacity = self.inner.get_node().log.capacity();
(pre_snapshot_capacity, post_snapshot_capacity)
}
pub fn send_append_log_and_tick(
&mut self,
data: Vec<(Option<TestState>, Term)>,
prev_log_index: LogIndex,
prev_log_term: Term,
leader_commit: LogIndex,
) {
let mut entries: Log<TestState> = Default::default();
for datum in data {
let _ = entries.push(LogEntry::with_data(datum.1, datum.0));
}
let append_log = AppendLog(AppendLogArgs {
term: 1,
prev_log_index,
prev_log_term,
entries: entries.clone(),
leader_commit,
});
self.inner.send_and_tick(self.inner.package(append_log));
}
pub fn install_snapshot(&mut self) -> InstallSnapshotArgs<TestState> {
let snapshot_entry = LogEntry::new(7, Some(TestState(5, 7)), None, None);
let args = InstallSnapshotArgs {
term: snapshot_entry.term,
leader_id: SystemNodeId::from(1),
last_included_index: 3,
last_included_term: 1,
data: snapshot_entry,
};
let install_snapshot = InstallSnapshot(args.clone());
self.inner
.send_and_tick(self.inner.package(install_snapshot));
args
}
pub fn last_log_entry(&self) -> LogEntry<TestState> {
self.inner.get_node().log.clone().last().unwrap().clone()
}
pub fn log(&self) -> Log<TestState> {
self.inner.get_node().log.clone()
}
}
impl<'a> From<TestContext<'a>> for FollowerContext<'a> {
fn from(context: TestContext<'a>) -> Self {
Self { inner: context }
}
}
}
#[macro_export]
macro_rules! initialize_follower {
($in_queue_test:ident, $out_queue_test:ident, $server:ident ) => {
let mut timer: FakeTimer = FakeTimer::new();
timer.reset_election_timer();
timer.reset_election_timer();
let (mut $in_queue_test, mut out_queue_server) = FakeMsgQueue::new();
let (mut in_queue_server, mut $out_queue_test) = FakeMsgQueue::new();
let mut cluster: LinearMap<ClusterMemberId, ClusterMember, CLUSTER_NODE_COUNT> =
LinearMap::new();
let _ = cluster.insert(
0,
ClusterMember {
id: 0,
vote_granted: Abstained,
match_idx: 0,
next_idx: 0,
},
);
let _ = cluster.insert(
2,
ClusterMember {
id: 2,
vote_granted: Abstained,
match_idx: 0,
next_idx: 0,
},
);
let mut $server = Server::new(
&mut timer,
&mut out_queue_server,
&mut in_queue_server,
cluster,
);
$server.id = 1;
$server.leader_id = 0;
};
}