use actix::prelude::*;
use log::{error, info, warn};
use crate::{
AppData, AppDataResponse, AppError,
admin::{InitWithConfig, InitWithConfigError, ProposeConfigChange, ProposeConfigChangeError},
common::UpdateCurrentLeader,
messages::{ClientPayload, ClientPayloadResponse, MembershipConfig},
network::RaftNetwork,
raft::{RaftState, Raft, ReplicationState, state::ConsensusState},
replication::{ReplicationStream},
storage::{GetLogEntries, RaftStorage},
};
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<InitWithConfig> for Raft<D, R, E, N, S> {
type Result = ResponseActFuture<Self, (), InitWithConfigError>;
fn handle(&mut self, mut msg: InitWithConfig, ctx: &mut Self::Context) -> Self::Result {
let is_pristine = self.last_log_index == 0 && self.state.is_non_voter();
if !is_pristine {
warn!("Raft received an InitWithConfig command, but the node is in state {} with index {}.", self.state, self.last_log_index);
return Box::new(fut::err(InitWithConfigError::NotAllowed));
}
msg = normalize_init_config(msg);
if !msg.members.contains(&self.id) {
msg.members.push(self.id.clone());
}
self.membership = MembershipConfig{is_in_joint_consensus: false, members: msg.members, non_voters: vec![], removing: vec![]};
if self.membership.members.len() == 1 && &self.membership.members[0] == &self.id {
self.current_term += 1;
self.voted_for = Some(self.id);
self.become_leader(ctx);
self.save_hard_state(ctx);
} else {
self.become_candidate(ctx);
}
Box::new(fut::ok(()))
}
}
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<ProposeConfigChange<D, R, E>> for Raft<D, R, E, N, S> {
type Result = ResponseActFuture<Self, (), ProposeConfigChangeError<D, R, E>>;
fn handle(&mut self, msg: ProposeConfigChange<D, R, E>, ctx: &mut Self::Context) -> Self::Result {
let leader_state = match &mut self.state {
RaftState::Leader(state) => state,
_ => return Box::new(fut::err(ProposeConfigChangeError::NodeNotLeader(self.current_leader.clone()))),
};
let msg = match normalize_proposed_config(msg, &self.membership) {
Ok(msg) => msg,
Err(err) => return Box::new(fut::err(err)),
};
match &mut leader_state.consensus_state {
ConsensusState::Joint{new_nodes, is_committed} => {
new_nodes.extend_from_slice(msg.add_members.as_slice());
*is_committed = false;
}
_ => {
leader_state.consensus_state = ConsensusState::Joint{new_nodes: msg.add_members.clone(), is_committed: false};
}
}
self.membership.is_in_joint_consensus = true;
self.membership.non_voters.extend_from_slice(msg.add_members.as_slice());
self.membership.removing.extend_from_slice(msg.remove_members.as_slice());
for target in msg.add_members {
let rs = ReplicationStream::new(
self.id, target, self.current_term, self.config.clone(),
self.last_log_index, self.last_log_term, self.commit_index,
ctx.address(), self.network.clone(), self.storage.clone().recipient::<GetLogEntries<D, E>>(),
);
let addr = rs.start();
let state = ReplicationState{
addr, match_index: self.last_log_index, remove_after_commit: None,
is_at_line_rate: true,
};
leader_state.nodes.insert(target, state);
}
for node in msg.remove_members {
if let Some((idx, _)) = self.membership.non_voters.iter().enumerate().find(|(_, e)| *e == &node) {
leader_state.nodes.remove(&node);
self.membership.non_voters.remove(idx);
}
}
self.report_metrics(ctx);
Box::new(fut::wrap_future(ctx.address().send(ClientPayload::new_config(self.membership.clone())))
.map_err(|_, _: &mut Self, _| ProposeConfigChangeError::Internal)
.and_then(|res, _, _| fut::result(res.map_err(|err| ProposeConfigChangeError::ClientError(err))))
.and_then(|res, act, ctx| act.handle_newly_committed_cluster_config(ctx, res))
)
}
}
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Raft<D, R, E, N, S> {
pub(super) fn handle_newly_committed_cluster_config(&mut self, ctx: &mut Context<Self>, _: ClientPayloadResponse<R>) -> impl ActorFuture<Actor=Self, Item=(), Error=ProposeConfigChangeError<D, R, E>> {
let leader_state = match &mut self.state {
RaftState::Leader(state) => state,
_ => return fut::ok(()),
};
match &mut leader_state.consensus_state {
ConsensusState::Joint{is_committed, new_nodes} => {
*is_committed = true;
if new_nodes.len() == 0 {
self.finalize_joint_consensus(ctx);
}
}
_ => (),
}
fut::ok(())
}
pub(super) fn finalize_joint_consensus(&mut self, ctx: &mut Context<Self>) {
let leader_state = match &mut self.state {
RaftState::Leader(state) => match &state.consensus_state {
ConsensusState::Joint{..} => state,
_ => return,
}
_ => return,
};
for node in self.membership.non_voters.drain(..) {
self.membership.members.push(node);
}
for node in self.membership.removing.drain(..) {
if let Some((idx, _)) = self.membership.members.iter().enumerate().find(|(_, e)| *e == &node) {
self.membership.members.remove(idx);
}
}
self.membership.is_in_joint_consensus = false;
leader_state.consensus_state = ConsensusState::Uniform;
ctx.spawn(fut::wrap_future(ctx.address().send(ClientPayload::new_config(self.membership.clone())))
.map_err(|err, _, _| error!("Messaging error submitting client payload to finalize joint consensus. {:?}", err))
.and_then(|res, _, _| fut::result(res
.map_err(|err| error!("Error from submitting client payload to finalize joint consensus. {:?}", err))))
.and_then(|res, act: &mut Self, ctx| act.handle_joint_consensus_finalization(ctx, res))
);
}
pub(super) fn handle_joint_consensus_finalization(&mut self, ctx: &mut Context<Self>, res: ClientPayloadResponse<R>) -> impl ActorFuture<Actor=Self, Item=(), Error=()> {
let leader_state = match &mut self.state {
RaftState::Leader(state) => match &state.consensus_state {
ConsensusState::Uniform => state,
_ => return fut::ok(()),
}
_ => return fut::ok(()),
};
if !self.membership.contains(&self.id) {
info!("Node {} is stepping down.", self.id);
self.become_non_voter(ctx);
self.update_current_leader(ctx, UpdateCurrentLeader::Unknown);
return fut::ok(());
}
let membership = &self.membership;
let nodes_to_remove: Vec<_> = leader_state.nodes.iter_mut()
.filter(|(id, _)| !membership.contains(id))
.filter_map(|(idx, replstate)| {
if replstate.match_index >= res.index() {
Some(idx.clone())
} else {
replstate.remove_after_commit = Some(res.index());
None
}
}).collect();
for node in nodes_to_remove {
leader_state.nodes.remove(&node);
}
fut::ok(())
}
}
fn normalize_init_config(msg: InitWithConfig) -> InitWithConfig {
let mut nodes = vec![];
for node in msg.members {
if !nodes.contains(&node) {
nodes.push(node);
}
}
InitWithConfig{members: nodes}
}
fn normalize_proposed_config<D: AppData, R: AppDataResponse, E: AppError>(mut msg: ProposeConfigChange<D, R, E>, current: &MembershipConfig) -> Result<ProposeConfigChange<D, R, E>, ProposeConfigChangeError<D, R, E>> {
let mut new_nodes = vec![];
for node in msg.add_members {
if !current.contains(&node) && !msg.remove_members.contains(&node) {
new_nodes.push(node);
}
}
let mut remove_nodes = vec![];
for node in msg.remove_members {
if current.contains(&node) && !current.removing.contains(&node) {
remove_nodes.push(node);
}
}
if (new_nodes.len() == 0) && (remove_nodes.len() == 0) {
return Err(ProposeConfigChangeError::Noop);
}
let total_removing = current.removing.len() + remove_nodes.len();
let count = current.members.len() + current.non_voters.len() + new_nodes.len();
if total_removing >= count {
return Err(ProposeConfigChangeError::InoperableConfig);
} else if (count - total_removing) < 2 {
return Err(ProposeConfigChangeError::InoperableConfig);
}
msg.add_members = new_nodes;
msg.remove_members = remove_nodes;
Ok(msg)
}