#![feature(slice_partition_dedup)]
use dcs::communication::messages::{Package, PackageBuilder};
use dcs::communication::service::CommunicationService;
use dcs::coordination::Stopwatch;
use crate::messages::{RaftMessage, RequestVoteArgs, WriteRequestArgs};
use crate::server::leader::leader_behavior::Leader;
use crate::server::MemberState::Follower;
use crate::server::{LeaderBehavior, LogData, RaftPackage, RaftService, MemberState};
impl<T: Stopwatch, L: LogData> LeaderBehavior<T, L> for RaftService<T, L> {
fn parse_message(
&mut self,
package: RaftPackage<L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let Package { header, body } = package;
match body {
RaftMessage::InstallSnapshotResponse(term) => {
if self.term < term {
self.become_follower_of(header.from.into(), term);
Follower
} else {
let next_index = self.log.last_included_index();
if let Some(member) = self.cluster.get_mut(&header.from) {
member.next_idx = next_index + 1;
}
if let Some(member) = self.cluster.get(&header.from) {
let member_log_is_not_up_to_date_with_leaders =
member.next_idx <= self.get_last_log_index();
if member_log_is_not_up_to_date_with_leaders {
self.send_next_entry_to(member, comm_service)
}
}
self.current_role
}
}
RaftMessage::AppendLogResponse(msg) => <dyn Leader<T, L>>::handle_append_log_response(
self,
header,
comm_service,
RaftMessage::AppendLogResponse(msg),
),
RaftMessage::RequestVoteResponse(_) => {
self.update_ttl(&header);
self.current_role
}
RaftMessage::RequestVote(RequestVoteArgs { term, .. }) => {
self.update_ttl(&header);
if term > self.term {
let new_leader = header.from.into();
self.become_follower_of(new_leader, term);
}
self.current_role
}
RaftMessage::WriteRequest(WriteRequestArgs { id, measurement, rule}) => {
if let Some(data) = measurement {
let data = L::from((id, data));
<dyn Leader<T, L>>::handle_write_measurement_request(self, header, data, comm_service)
} else if let Some(change) = rule {
<dyn Leader<T, L>>::handle_write_rule_request(self, header, change, comm_service)
} else {
self.current_role
}
}
RaftMessage::ReadRequest => {
<dyn Leader<T, L>>::handle_read_request(self, header, comm_service)
}
RaftMessage::ConfigChange(new_config) => {
<dyn Leader<T, L>>::handle_config_change(self, header, comm_service, new_config)
}
msg => {
log::warn!("Unexpected message received as Leader: {:?}", msg);
self.current_role
}
}
}
fn after_tick(&mut self, comm_service: &mut dyn CommunicationService<RaftPackage<L>>) {
<dyn Leader<T, L>>::handle_after_tick(self, comm_service)
}
}
mod leader_behavior {
use dcs::communication::messages::{UpdateClusterVec, Header, PackageBuilder};
use dcs::communication::service::CommunicationService;
use dcs::coordination::Stopwatch;
use dcs::heapless;
use dcs::nodes::SystemNodeId;
use std::cmp::{max, min};
use log::warn;
use crate::messages::*;
use crate::server::MemberState::Follower;
use crate::server::*;
use crate::RaftMessage::ConfigChange;
pub trait Leader<T, L> {}
impl<T: Stopwatch, L: LogData> dyn Leader<T, L> {
pub fn handle_after_tick(
server: &mut RaftService<T, L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
server.update_commit_index();
Self::propagate_new_config(server, comm_service);
if server.io.timer.is_timeout() {
server.send_heartbeat_to_followers(comm_service);
server.io.set_heartbeat_timeout();
}
}
pub fn propagate_new_config(
server: &mut RaftService<T, L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let idx_of_most_recent_config: LogIndex = server
.log
.iter()
.rposition(|entry| entry.config_change.is_some())
.unwrap_or(usize::MAX) as u32;
if idx_of_most_recent_config < server.commit_index && server.next_config.is_some() {
let config = server.next_config.take().unwrap();
Self::update_config(server, &config);
log::debug!("Replicating new config {config:?}");
let entry = LogEntry::with_config(server.term, Some(config));
server.replicate_entry(entry.clone(), comm_service);
server.log.push(entry);
}
}
pub fn handle_append_log_response(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
message: RaftMessage<L>,
) -> MemberState {
match message {
RaftMessage::AppendLogResponse(AppendLogResponseResult { success, .. })
if !success =>
{
server.update_ttl(&header);
if let Some(member) = server.cluster.get_mut(&header.from) {
member.next_idx = max(1, member.next_idx - 1);
}
if let Some(member) = server.cluster.get(&header.from) {
if server.log.is_snapshot(member.next_idx)
&& server.log.last_included_index() > 1
{
server.send_install_snapshot_to(member, comm_service)
} else {
server.send_next_entry_to(member, comm_service)
}
}
}
RaftMessage::AppendLogResponse(AppendLogResponseResult { success, .. })
if success =>
{
server.update_ttl(&header);
if let Some(member) = server.cluster.get_mut(&header.from) {
member.next_idx += 1;
}
if let Some(member) = server.cluster.get(&header.from) {
let member_log_is_not_up_to_date_with_leaders =
member.next_idx <= server.get_last_log_index();
if member_log_is_not_up_to_date_with_leaders {
server.send_next_entry_to(member, comm_service)
}
}
}
_ => {}
}
server.current_role
}
pub fn handle_config_change(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
new_config: UpdateClusterVec,
) -> MemberState {
info!("[REMOVE] trying to replicate {new_config:?}");
let replicated = Self::try_replicate_config(new_config, server, comm_service);
info!("[REMOVE] replicated: {replicated}");
let response = package(RaftMessage::ConfigChangeACK(replicated))
.from(server.id.into())
.to(header.from)
.build()
.unwrap();
comm_service.push(response);
server.current_role
}
fn try_replicate_config(
mut new_config: UpdateClusterVec,
server: &mut RaftService<T, L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> bool {
if Self::config_requires_changes(&mut new_config, server) {
server.next_config = Some(new_config.clone());
let temp_config = Self::create_temp_config(new_config, server);
server.update_config(&temp_config);
let entry = LogEntry::with_config(server.term, Some(temp_config));
Self::try_replicate_entry(entry, server, comm_service)
} else {
true
}
}
fn update_config(server: &mut RaftService<T, L>, new_config: &UpdateClusterVec) {
for member in new_config {
let member = *member;
if !server.cluster.contains_key(&member) && member != server.id {
let new_member = ClusterMember {
id: member,
vote_granted: ElectionVote::Abstained,
next_idx: 1,
match_idx: 0,
last_successful_heartbeat: 0,
};
debug!("Adding new member to cluster {new_member:?}");
server.cluster.insert(member, new_member);
}
}
let mut members_to_delete = heapless::Vec::<SystemNodeId, CLUSTER_NODE_COUNT>::new();
for system_id in server.cluster.keys() {
if !new_config.contains(&system_id) {
members_to_delete.push(*system_id);
}
}
members_to_delete.iter().for_each(|member| {
log::debug!("Removing member from cluster {member:?}");
server.cluster.remove(member);
});
}
fn create_temp_config(
mut new_config: UpdateClusterVec,
server: &mut RaftService<T, L>,
) -> UpdateClusterVec {
const TWICE_CLUSTER_NODE_COUNT: usize = CLUSTER_NODE_COUNT * 2;
let mut config: heapless::Vec<SystemNodeId, TWICE_CLUSTER_NODE_COUNT> = heapless::Vec::new();
config.extend(new_config);
config.extend(server.cluster.values().map(|v| v.id));
config.sort();
let mut deduped = UpdateClusterVec::new();
for node in config {
if !deduped.contains(&node) {
deduped.push(node).unwrap_or_else(|_| warn!("Couldn't insert new member with id {} into cluster. Max capacity reach.", node));
}
}
deduped
}
fn config_requires_changes(
new_config: &mut UpdateClusterVec,
server: &mut RaftService<T, L>,
) -> bool {
if new_config.is_empty() {
return false;
}
let mut current_config = server.current_config();
let same_length = new_config.len() == current_config.len();
if !same_length {
return true;
}
current_config.sort();
new_config.sort();
current_config
.iter()
.zip(new_config.iter())
.any(|(id1, id2)| id1 != id2)
}
pub fn handle_read_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let maintains_leadership = Self::maintains_leadership(server, comm_service);
if maintains_leadership {
Self::answer_read_request(server, header, comm_service);
server.current_role
} else {
Self::reject_read_request(server, header, comm_service);
Follower
}
}
fn reject_read_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let reply = ReadRequestReply(ReadRequestReplyArgs {
success: false,
data: None,
redirect: None,
});
comm_service.push(server.build_message(reply, header.from).unwrap())
}
fn answer_read_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) {
let cloned_log = server.log.clone();
let merged_entries = cloned_log
.into_iter()
.filter_map(|entry| entry.data)
.reduce(L::merge);
let reply = RaftMessage::ReadRequestReply(ReadRequestReplyArgs {
success: true,
data: merged_entries,
redirect: None,
});
comm_service.push(server.build_message(reply, header.from).unwrap())
}
fn maintains_leadership(
server: &mut RaftService<T, L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> bool {
server.send_heartbeat_to_followers(comm_service);
let heartbeat_ttl = server.io.get_leasing_time_secs();
debug!(
"Checking if node (with leasing {} at timestamp {}) maintains leadership in cluster with members: {:?}",
heartbeat_ttl,
server.io.timer.current_time_as_secs(),
server.cluster
);
let known_followers = server
.cluster
.iter()
.filter(|(_, node)| {
server.io.timer.current_time_as_secs() - node.last_successful_heartbeat < heartbeat_ttl
})
.count();
debug!("Maintained leadership of {} nodes out of {}", known_followers, server.cluster.len());
server.cluster.len() / 2 <= known_followers
}
fn try_replicate_entry(
entry: LogEntry<L>,
server: &mut RaftService<T, L>,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> bool {
if Self::maintains_leadership(server, comm_service) {
server.replicate_entry(entry.clone(), comm_service);
let _ = server.log.push(entry);
trace!("Updated log to: {:?}", server.log);
true
} else {
debug!("Rejected log replication request: {:?}", server.log);
false
}
}
pub fn handle_write_rule_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
new_rule: Rule,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let entry = LogEntry::with_rule(server.term, new_rule);
Self::handle_new_entry(server, header, comm_service, entry)
}
pub fn handle_write_measurement_request(
server: &mut RaftService<T, L>,
header: Header<SystemNodeId>,
args: L,
comm_service: &mut dyn CommunicationService<RaftPackage<L>>,
) -> MemberState {
let entry = LogEntry::with_data(server.term, Some(args));
Self::handle_new_entry(server, header, comm_service, entry)
}
fn handle_new_entry(server: &mut RaftService<T, L>, header: Header<SystemNodeId>, comm_service: &mut dyn CommunicationService<RaftPackage<L>>, entry: LogEntry<L>) -> MemberState {
let replicated = Self::try_replicate_entry(entry, server, comm_service);
let package = if replicated {
let accept_write_request = RaftMessage::WriteRequestReply(true, None);
package(accept_write_request)
.from(server.id.into())
.to(header.from)
.build()
.unwrap()
} else {
let reject_write_request = RaftMessage::WriteRequestReply(false, None);
package(reject_write_request)
.from(server.id.into())
.to(header.from)
.build()
.unwrap()
};
if package.get_sender() != package.get_receiver() {
comm_service.push(package);
}
server.current_role
}
}
}
#[allow(non_snake_case)]
#[cfg(test)]
mod leader_tests {
extern crate alloc;
use core::fmt::Debug;
use std::cmp::Ordering;
use std::iter::{repeat, FromIterator};
use crate::ELECTION_TIMEOUT;
use dcs::communication::connection::{InMsgQueue, OutMsgQueue};
use dcs::communication::messages::{Header, PackageBuilder};
use dcs::communication::service::CommunicationService;
use dcs::nodes::SystemNodeId;
use dcs::rules::measurements::Measurement;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
use crate::messages::*;
use crate::server::test_server_builder::ContextBuilder;
use crate::server::test_utils::*;
#[test]
fn given_leader__when_receives_read_only_request__then_sends_heartbeats_to_followers() {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
repeat(0)
.take(ELECTION_TIMEOUT as usize)
.for_each(|_| context.tick());
context.send_read_request_and_tick();
assert_eq!(context.cluster_size() - 1, context.count_heartbeats())
}
#[test]
fn given_leader__when_receives_client_request__then_append_request_to_log_and_replicates_log() {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.empty_queue();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
let expected_entry = LogEntry::with_data(context.term(), Some(TestState(3, 0)));
assert!(context.entry_was_broadcasted(expected_entry))
}
#[test]
fn given_write_request__then_can_read_data() {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.empty_queue();
context.send_read_request_and_tick();
let data = context.get_read_response().and_then(|args| args.data);
assert_eq!(Some(TestState(3, 0)), data)
}
#[test]
fn given_write_request__when_replicating_state__and_all_accept_appendlog__then_data_is_commited(
) {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.empty_queue();
context.followers().into_iter().for_each(|follower| {
context.accept_append_log(follower);
context.tick();
});
context.send_write_request_and_tick(some_measurement());
let commit_idx = context
.append_log_requests()
.last()
.cloned()
.unwrap()
.leader_commit;
assert_eq!(2, commit_idx) }
#[test]
fn given_append_log_rejection__then_decrease_next_index_and_retries() {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.send_write_request(some_measurement());
context.empty_queue();
context.reject_append_log(1.into());
context.tick();
let retry_pkg = context.recv().unwrap();
let retry_msg = retry_pkg.get_message();
assert!(matches!(retry_msg, &RaftMessage::AppendLog::<TestState>(_)));
}
fn some_measurement() -> Measurement {
Measurement::new(TEMPERATURE, 3)
}
#[test]
fn given_append_log_success__then_increase_next_index() {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.empty_queue();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
let messages = context.messages();
for pkg in messages {
let msg = pkg.get_message();
if matches!(
msg,
&RaftMessage::AppendLog::<TestState>(AppendLogArgs {
prev_log_index: 3,
..
})
) {
return;
}
}
unreachable!("");
}
#[test]
fn given_multiple_write_request__when_reading_data__result_is_the_accumulation_of_all_log_states(
) {
let mut builder = ContextBuilder::new();
let mut context = builder.leader();
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
context.empty_queue();
context.send_read_request_and_tick();
let data = context.get_read_response().unwrap().data.unwrap();
assert_eq!(TestState(6, 0), data)
}
}
#[cfg(test)]
mod config_change_tests {
use std::cmp::Ordering;
use dcs::communication::connection::*;
use crate::server::leader::test_utils::*;
use crate::server::test_server_builder::*;
use crate::server::test_utils::*;
use crate::server::*;
use crate::*;
#[test]
fn empty_config_change_is_accepted_and_cluster_doesnt_change() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.send_config(context.current_config());
assert_eq!(
Some(RaftMessage::ConfigChangeACK(true)),
context.recv().map(|p| p.body)
);
let change_in_log = context.recv();
assert!(change_in_log.is_none());
}
mod intermediate_config {
use super::*;
#[test]
fn deleting_one_node_cluster_size_remains_the_same() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.remove_member_from_config();
assert!(context.config_change_acked());
assert_eq!(0, context.cluster_size_diff());
}
#[test]
fn adding_one_node_cluster_size_increased() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.add_member_to_config();
assert!(context.config_change_acked());
assert_eq!(1, context.cluster_size_diff());
}
#[test]
fn deleting_one_node_cluster_config_is_propagated() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.remove_member_from_config();
context.assert_config_was_propagated(context.start_config());
}
#[test]
fn adding_one_node_cluster_config_is_propagated() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.add_member_to_config();
context.assert_config_was_propagated(context.new_config());
}
#[test]
fn config_commited_after_majority_reply() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.add_member_to_config();
context.capture_commit_idx();
context.followers_accept_append_log_request();
context.trigger_entry_replication();
context.assert_leader_commit_was_increased();
}
}
mod final_config {
use super::*;
#[test]
fn after_intermediate_config_commited_uses_final_config() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.remove_member_from_config();
context.followers_accept_append_log_request();
assert_eq!(context.new_config(), context.current_config())
}
#[test]
fn after_intermediate_config_commited_replicates_final_config() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.remove_member_from_config();
context.followers_accept_append_log_request();
context.assert_config_was_propagated_as_final_config(context.new_config());
}
#[test]
fn if_member_is_removed_from_config_does_not_receive_messages() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.remove_member_from_config();
context.followers_accept_append_log_request();
let removed_nodes = context.get_removed_nodes();
let package_is_appendlog = |pkg: &RaftPackage<TestState>| {
if let AppendLog(args) = &pkg.body {
args.leader_commit == 2 && !args.entries.is_empty()
} else {
false
}
};
context
.assert_nodes_didnt_received_messages_of_type(removed_nodes, package_is_appendlog)
}
}
}
#[cfg(test)]
mod snapshot_tests {
use dcs::nodes::SystemNodeId;
use crate::messages::{InstallSnapshotArgs, LogEntry, RaftMessage, LOG_LEN};
use crate::server::leader::test_utils::LeaderContext;
use crate::server::test_server_builder::{ContextBuilder, TestContext};
use crate::server::test_utils::TestState;
use dcs::rules::measurements::Measurement;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
#[test]
pub fn snapshot_occurs_at_75_percent_capacity() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
let (pre_snapshot_capacity, post_snapshot_capacity) =
context.trigger_snapshot_at_75_percent_occupation();
assert!(pre_snapshot_capacity < post_snapshot_capacity);
}
#[test]
pub fn follower_rejects_append_log_below_snapshot_idx_then_send_install_snapshot() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.trigger_snapshot_at_75_percent_occupation();
for _ in 0..LOG_LEN {
context.followers_reject_append_log_request();
}
let install_snapshot_fount = context
.messages()
.iter()
.map(|package| package.get_message())
.any(|message| matches!(message, RaftMessage::InstallSnapshot(_)));
assert!(install_snapshot_fount)
}
#[test]
pub fn snapshot_contains_relevant_data() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.trigger_snapshot_at_75_percent_occupation();
for _ in 0..LOG_LEN {
context.followers_reject_append_log_request();
}
let install_snapshot_fount = context
.messages()
.iter()
.map(|package| package.get_message())
.filter(|message| matches!(message, RaftMessage::InstallSnapshot(_)))
.next()
.cloned()
.unwrap();
if let RaftMessage::InstallSnapshot(args) = install_snapshot_fount {
assert_eq!(1, args.term);
assert_eq!(SystemNodeId::from(0), args.leader_id);
assert_eq!((LOG_LEN as f32 * 0.75) as u32, args.last_included_index);
assert_eq!(1, args.last_included_term);
assert_eq!(LogEntry::new(1, Some(TestState(300, 0)), None, None), args.data);
} else {
unreachable!()
}
}
#[test]
pub fn if_follower_responds_with_greater_term_then_become_follower() {
let mut builder = ContextBuilder::new();
let mut context: LeaderContext = builder.leader().into();
context.send_install_snapshot_and_tick(40);
context.context.empty_queue();
context
.context
.send_write_request_and_tick(Measurement::new(TEMPERATURE, 3));
let msg = context
.context
.messages()
.iter()
.find(|pkg| matches!(pkg.body, RaftMessage::WriteRequestReply(_, _)))
.cloned()
.unwrap();
assert!(
matches!(msg.body, RaftMessage::WriteRequestReply(false, Some(_))),
"Wasn't expecting {msg:?}"
);
}
}
#[cfg(test)]
pub mod test_utils {
use std::cmp::Ordering;
use dcs::communication::connection::*;
use dcs::rules::measurements::ClusterType::TEMPERATURE;
use crate::messages::RaftMessage::InstallSnapshotResponse;
use crate::server::test_server_builder::*;
use crate::server::test_utils::*;
use crate::server::*;
use crate::*;
pub struct LeaderContext<'a> {
pub context: TestContext<'a>,
start_config: Option<UpdateClusterVec>,
new_config: Option<UpdateClusterVec>,
captured_commit: Option<LogIndex>,
}
impl<'a> LeaderContext<'a> {
pub fn send_install_snapshot_and_tick(&mut self, term: Term) {
self.context
.send_and_tick(self.context.package(InstallSnapshotResponse(term)));
}
pub(crate) fn send_config(&mut self, config: UpdateClusterVec) {
let empty_change_req = self
.context
.package(RaftMessage::<TestState>::ConfigChange(config));
self.context.send_and_tick(empty_change_req);
}
pub fn current_config(&self) -> UpdateClusterVec {
let mut config: UpdateClusterVec = self
.context
.cluster()
.values()
.map(|node| node.id.into())
.collect();
config.push(self.context.id().into());
config.sort();
config
}
pub fn recv(&mut self) -> Option<RaftPackage<TestState>> {
self.context.recv()
}
pub fn recv_message(&mut self) -> Option<RaftMessage<TestState>> {
self.context.recv_message()
}
pub fn messages(&mut self) -> Vec<RaftPackage<TestState>> {
self.context.messages()
}
pub fn remove_member_from_config(&mut self) {
self.start_config = Some(self.current_config());
let mut config_without_one_member = self.context.current_config();
config_without_one_member.remove(1);
self.new_config = Some(config_without_one_member.clone());
let delete_one = RaftMessage::<TestState>::ConfigChange(config_without_one_member);
self.context.send_and_tick(self.context.package(delete_one));
}
pub fn config_change_acked(&mut self) -> bool {
let mut messages = self.context.messages();
messages
.iter()
.any(|msg| matches!(&RaftMessage::<TestState>::ConfigChangeACK(true), msg))
}
pub fn cluster_size_diff(&mut self) -> i32 {
let start_cluster = self.start_config.as_ref().unwrap();
self.context.current_config().len() as i32 - start_cluster.len() as i32
}
pub fn add_member_to_config(&mut self) {
self.start_config = Some(self.current_config());
let mut config_with_added_member = self.context.current_config();
config_with_added_member.push(SystemNodeId::from(4));
self.new_config = Some(config_with_added_member.clone());
let add_one = RaftMessage::<TestState>::ConfigChange(config_with_added_member);
self.context.send_and_tick(self.context.package(add_one));
}
pub fn assert_config_was_propagated(&mut self, config: UpdateClusterVec) {
let messages: Vec<_> = self.context.append_log_requests();
messages
.into_iter()
.filter(|args| !args.entries.is_empty())
.for_each(|args| {
let entry = args.entries.get(1).unwrap();
let mut actual_change = entry.config_change.clone().unwrap();
actual_change.sort();
assert_eq!(config, actual_change)
});
}
pub fn start_config(&self) -> UpdateClusterVec {
self.start_config.as_ref().unwrap().clone()
}
pub fn new_config(&self) -> UpdateClusterVec {
self.new_config.as_ref().unwrap().clone()
}
pub fn followers_accept_append_log_request(&mut self) {
self.context.followers_answers_append_log_request(true)
}
pub fn followers_reject_append_log_request(&mut self) {
self.context.followers_answers_append_log_request(false)
}
pub fn trigger_entry_replication(&mut self) {
self.context.empty_queue();
let mut config = self.current_config();
config.push(SystemNodeId::from(99));
let add_one = RaftMessage::<TestState>::ConfigChange(config);
self.context.send_and_tick(self.context.package(add_one));
}
pub fn assert_leader_commit_was_increased(&mut self) {
let mut messages = self.context.append_log_requests();
messages
.into_iter()
.filter(|args| !args.entries.is_empty())
.for_each(|args| {
assert_eq!(self.captured_commit.unwrap() + 1, args.leader_commit);
});
}
pub fn assert_config_was_propagated_as_final_config(&mut self, config: UpdateClusterVec) {
let idx = self.context.commit_idx();
let messages: Vec<_> = self.context.append_log_requests();
messages
.into_iter()
.filter(|args| !args.entries.is_empty())
.filter(|args| args.leader_commit == idx)
.filter(|args| args.prev_log_index == idx)
.for_each(|args| {
let entry = args.entries.get(1).unwrap();
let mut actual_change = entry.config_change.clone().unwrap();
actual_change.sort();
assert_eq!(config, actual_change);
});
}
pub fn capture_commit_idx(&mut self) {
let appendlog = self.context.append_log_requests().last().cloned().unwrap();
self.captured_commit = Some(appendlog.leader_commit);
}
pub fn get_removed_nodes(&self) -> Vec<SystemNodeId> {
let mut removed_nodes = vec![];
let current_config = self.context.current_config();
for node in self.start_config.as_ref().unwrap() {
if !current_config.contains(node) {
removed_nodes.push((*node).into())
}
}
removed_nodes
}
pub fn assert_nodes_didnt_received_messages_of_type(
&mut self,
nodes: Vec<SystemNodeId>,
filter: fn(&RaftPackage<TestState>) -> bool,
) {
let messages = self.context.messages();
for node in nodes {
assert!(!messages
.iter()
.filter(|pkg| pkg.header.to == node)
.any(filter))
}
}
pub fn trigger_snapshot_at_75_percent_occupation(&mut self) -> (f32, f32) {
let count = LOG_LEN as f32 * 0.74;
for i in 0..count.floor() as u32 {
self.context
.send_write_request_and_tick(Measurement::new(TEMPERATURE, i as i32));
self.followers_accept_append_log_request();
}
let pre_snapshot_capacity = self.context.get_node().log.capacity();
let count = LOG_LEN as f32 * 0.05;
for i in 0..count.ceil() as u32 {
self.context
.send_write_request_and_tick(Measurement::new(TEMPERATURE, i as i32));
self.followers_accept_append_log_request();
}
let post_snapshot_capacity = self.context.get_node().log.capacity();
(pre_snapshot_capacity, post_snapshot_capacity)
}
}
impl<'a> From<TestContext<'a>> for LeaderContext<'a> {
fn from(context: TestContext<'a>) -> Self {
Self {
context,
start_config: None,
new_config: None,
captured_commit: None,
}
}
}
}