use std::cmp;
use std::collections::{BTreeMap, HashMap};
use std::iter::Extend;
use std::time::{Duration, Instant};
use super::compressed_log::CompressedLog;
pub use super::error::*;
pub use super::future::*;
pub use super::serde::*;
#[derive(Clone)]
pub struct Config {
pub election_timeout: Duration,
pub heartbeat_interval: Duration,
}
impl Default for Config {
fn default() -> Config {
Config {
election_timeout: Duration::from_millis(100),
heartbeat_interval: Duration::from_millis(10),
}
}
}
#[derive(Debug)]
pub enum Input {
Write(WriteReq, WriteFuture),
Read(ReadReq, ReadFuture),
Tick(Instant),
Message(Message),
PersistRes(PersistRes),
ReadStateMachineRes(ReadStateMachineRes),
}
#[derive(Debug)]
pub enum Output {
Message(Message),
PersistReq(PersistReq),
ApplyReq(Index),
ReadStateMachineReq(ReadStateMachineReq),
}
#[derive(Debug)]
pub struct PersistReq {
pub leader_id: NodeID,
pub read_id: ReadID,
pub entries: Vec<Entry>,
}
#[derive(Debug)]
pub struct PersistRes {
pub leader_id: NodeID,
pub read_id: ReadID,
pub log_index: Index,
}
#[derive(Debug)]
pub struct ReadStateMachineReq {
pub index: Index,
pub read_id: ReadID,
pub payload: Vec<u8>,
}
#[derive(Debug)]
pub struct ReadStateMachineRes {
pub index: Index,
pub read_id: ReadID,
pub payload: Vec<u8>,
}
pub struct Raft {
state: Option<State>,
}
impl Raft {
pub fn new(id: NodeID, peers: Vec<NodeID>, cfg: Config) -> Raft {
let state = State::Candidate(Candidate {
shared: SharedState {
id: id,
cfg: cfg,
current_term: Term(0),
voted_for: None,
log: CompressedLog::new(),
commit_index: Index(0),
last_applied: Index(0),
peers: peers,
current_time: None,
last_communication: None,
},
received_votes: 0,
});
Raft { state: Some(state) }
}
pub fn id(&self) -> NodeID {
return self.state.as_ref().expect("unreachable").id();
}
#[cfg(test)]
pub fn current_time(&self) -> Option<Instant> {
return self.state.as_ref().expect("unreachable").shared().current_time;
}
#[cfg(test)]
pub fn current_term(&self) -> Term {
return self.state.as_ref().expect("unreachable").shared().current_term;
}
#[cfg(test)]
fn debug(&self) -> &'static str {
return self.state.as_ref().expect("unreachable").debug();
}
pub fn step(&mut self, output: &mut impl Extend<Output>, input: Input) {
self.state = Some(self.state.take().expect("unreachable").step(output, input));
}
fn shutdown(&mut self) {
self.state.take().expect("unreachable").shutdown()
}
}
struct SharedState {
id: NodeID,
cfg: Config,
current_term: Term,
voted_for: Option<NodeID>,
log: CompressedLog,
commit_index: Index,
last_applied: Index,
peers: Vec<NodeID>,
current_time: Option<Instant>,
last_communication: Option<Instant>,
}
struct Candidate {
shared: SharedState,
received_votes: usize,
}
struct Leader {
shared: SharedState,
_next_index: HashMap<NodeID, Index>,
match_index: HashMap<NodeID, (Index, ReadID)>,
write_buffer: HashMap<(Term, Index), WriteFuture>,
next_read_id: ReadID,
max_outstanding_read_id: Option<ReadID>,
max_confirmed_read_id: Option<ReadID>,
read_buffer: BTreeMap<(Index, ReadID), (Option<ReadReq>, ReadFuture)>,
}
struct Follower {
shared: SharedState,
leader_hint: NodeID,
}
#[allow(clippy::large_enum_variant)]
enum State {
Candidate(Candidate),
Follower(Follower),
Leader(Leader),
}
impl State {
fn id(&self) -> NodeID {
self.shared().id
}
fn shared(&self) -> &SharedState {
match self {
State::Candidate(c) => &c.shared,
State::Follower(f) => &f.shared,
State::Leader(l) => &l.shared,
}
}
fn shared_mut(&mut self) -> &mut SharedState {
match self {
State::Candidate(c) => &mut c.shared,
State::Follower(f) => &mut f.shared,
State::Leader(l) => &mut l.shared,
}
}
#[cfg(test)]
fn debug(&self) -> &'static str {
match self {
State::Candidate(_) => "candidate",
State::Follower(_) => "follower",
State::Leader(_) => "leader",
}
}
fn step(self, output: &mut impl Extend<Output>, input: Input) -> State {
debug!(" {:3}: step {:?}", self.id().0, self.debug());
match input {
Input::Write(req, res) => self.write(output, req.payload, Some(res)),
Input::Read(req, res) => self.read(output, req, res),
Input::Tick(now) => self.tick(output, now),
Input::PersistRes(res) => self.persist_res(output, res),
Input::ReadStateMachineRes(res) => self.read_state_machine_res(output, res),
Input::Message(message) => self.message(output, message),
}
}
fn maybe_wake_writes(mut leader: Leader) -> Leader {
let current_term = leader.shared.current_term;
let commit_index = leader.shared.commit_index;
#[cfg(feature = "log")]
let id = leader.shared.id;
leader.write_buffer.retain(|(term, index), future| {
debug_assert!(*term == current_term);
if *index >= commit_index {
let res = WriteRes { term: *term, index: *index };
debug!(" {:3}: write success {:?}", id.0, res);
future.fill(Ok(res));
false
} else {
true
}
});
leader
}
fn maybe_apply(
shared: &mut SharedState,
output: &mut impl Extend<Output>,
upper_bound: Option<Index>,
) {
debug!(
" {:3}: maybe_apply commit_index={:?} last_applied={:?} upper_bound={:?}",
shared.id.0, shared.commit_index, shared.last_applied, upper_bound,
);
let new_applied = upper_bound
.map_or(shared.commit_index, |upper_bound| cmp::min(upper_bound, shared.commit_index));
if new_applied > shared.last_applied {
shared.last_applied = new_applied;
output.extend(vec![Output::ApplyReq(shared.last_applied)]);
}
}
fn leader_maybe_advance_reads(mut leader: Leader, output: &mut impl Extend<Output>) -> Leader {
debug!(
" {:3}: leader_maybe_advance_reads max_applied={:?} outstanding={:?} confirmed={:?} read_buffer={:?}",
leader.shared.id.0,
leader.shared.last_applied,
leader.max_outstanding_read_id,
leader.max_confirmed_read_id,
leader.read_buffer.keys()
);
if let Some(((_, max_read_id), _)) = leader.read_buffer.iter().next_back() {
let need_heartbeat =
leader.max_confirmed_read_id.map_or(true, |confirmed| *max_read_id > confirmed)
&& leader.max_outstanding_read_id.map_or(true, |outstanding| *max_read_id > outstanding);
debug!(" {:3}: need_heartbeat={:?}", leader.shared.id.0, need_heartbeat);
if need_heartbeat {
leader = State::leader_heartbeat(leader, output);
}
}
let can_serve = (leader.shared.last_applied, leader.max_confirmed_read_id.unwrap_or(ReadID(0)));
debug!(" {:3}: can_serve={:?}", leader.shared.id.0, can_serve);
for ((index, read_id), (req, _)) in leader.read_buffer.range_mut(..can_serve) {
if let Some(req) = req.take() {
let msg = ReadStateMachineReq { index: *index, read_id: *read_id, payload: req.payload };
output.extend(vec![Output::ReadStateMachineReq(msg)]);
}
}
leader
}
fn write(
self,
output: &mut impl Extend<Output>,
payload: Vec<u8>,
mut res: Option<WriteFuture>,
) -> State {
#[cfg(feature = "log")]
match std::str::from_utf8(&payload) {
Ok(payload) => {
debug!(" {:3}: write {:?}", self.id().0, payload);
}
Err(_) => {
debug!(" {:3}: write {:?}", self.id().0, payload);
}
}
match self {
State::Leader(leader) => {
State::Leader(State::leader_write(leader, output, vec![(payload, res)]))
}
State::Candidate(candidate) => match candidate.shared.voted_for {
Some(voted_for) => {
if let Some(mut res) = res.take() {
res.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(voted_for)))));
};
return State::Candidate(candidate);
}
None => {
let state = State::start_election(candidate, output);
state.write(output, payload, res)
}
},
State::Follower(follower) => {
if let Some(mut res) = res.take() {
res.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(
follower.leader_hint,
)))));
};
State::Follower(follower)
}
}
}
fn leader_heartbeat(leader: Leader, output: &mut impl Extend<Output>) -> Leader {
State::leader_write(leader, output, vec![])
}
fn leader_write(
mut leader: Leader,
output: &mut impl Extend<Output>,
payloads: Vec<(Vec<u8>, Option<WriteFuture>)>,
) -> Leader {
let (prev_log_term, prev_log_index) = leader.shared.log.last();
let read_id = leader.next_read_id;
leader.next_read_id = ReadID(leader.next_read_id.0 + 1);
leader.max_outstanding_read_id = Some(read_id);
let entries: Vec<_> = payloads
.into_iter()
.enumerate()
.map(|(offset, (payload, res))| {
let entry = Entry {
term: leader.shared.current_term,
index: prev_log_index + offset as u64 + 1,
payload: payload,
};
debug_assert!(leader.write_buffer.get(&(entry.term, entry.index)).is_none());
if let Some(res) = res {
leader.write_buffer.insert((entry.term, entry.index), res);
}
entry
})
.collect();
debug!(" {:3}: entries={:?}", leader.shared.id.0, entries);
leader.shared.log.extend(&entries);
debug!(" {:3}: persist {:?}", leader.shared.id.0, &entries);
if entries.len() > 0 {
let msg =
PersistReq { leader_id: leader.shared.id, read_id: read_id, entries: entries.clone() };
output.extend(vec![Output::PersistReq(msg)]);
} else {
let id = leader.shared.id;
leader = State::ack_term_index(leader, output, id, prev_log_index, read_id);
}
let payload = Payload::AppendEntriesReq(AppendEntriesReq {
term: leader.shared.current_term,
leader_id: leader.shared.id,
prev_log_index: prev_log_index,
prev_log_term: prev_log_term,
leader_commit: leader.shared.commit_index,
entries: entries,
read_id: read_id,
});
State::message_to_all_other_nodes(&leader.shared, output, &payload);
leader
}
fn read(self, output: &mut impl Extend<Output>, req: ReadReq, mut res: ReadFuture) -> State {
debug!(" {:3}: read {:?}", self.id().0, req);
match self {
State::Leader(leader) => {
State::Leader(State::leader_read(leader, output, req, res))
}
State::Candidate(candidate) => match candidate.shared.voted_for {
Some(voted_for) => {
res.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(voted_for)))));
return State::Candidate(candidate);
}
None => {
let state = State::start_election(candidate, output);
state.read(output, req, res)
}
},
State::Follower(follower) => {
res.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(follower.leader_hint)))));
State::Follower(follower)
}
}
}
fn leader_read(
mut leader: Leader,
output: &mut impl Extend<Output>,
req: ReadReq,
res: ReadFuture,
) -> Leader {
let read_id = leader.next_read_id;
leader.next_read_id = ReadID(leader.next_read_id.0 + 1);
let index = leader.shared.log.last().1;
leader.read_buffer.insert((index, read_id), (Some(req), res));
State::leader_maybe_advance_reads(leader, output)
}
fn tick(self, output: &mut impl Extend<Output>, now: Instant) -> State {
debug!(
" {:3}: self.tick={:?} current_time={:?}",
self.shared().id.0,
now,
self.shared().current_time
);
if self.shared().current_time.map_or(false, |current_time| now <= current_time) {
return self;
}
match self {
State::Candidate(mut candidate) => {
let timed_out = candidate.shared.last_communication.map_or(true, |last_communication| {
now.duration_since(last_communication) >= candidate.shared.cfg.election_timeout
});
candidate.shared.current_time = Some(now);
if timed_out {
return State::start_election(candidate, output);
}
State::Candidate(candidate)
}
State::Follower(mut follower) => {
let timed_out = follower.shared.last_communication.map_or(true, |last_communication| {
now.duration_since(last_communication) >= follower.shared.cfg.election_timeout
});
follower.shared.current_time = Some(now);
if timed_out {
return State::follower_convert_to_candidate(follower, output);
}
State::Follower(follower)
}
State::Leader(mut leader) => {
let need_heartbeat = leader.shared.last_communication.map_or(true, |last_communication| {
now.duration_since(last_communication) >= leader.shared.cfg.heartbeat_interval
});
leader.shared.current_time = Some(now);
if need_heartbeat {
leader = State::leader_heartbeat(leader, output)
}
State::Leader(leader)
}
}
}
fn persist_res(self, output: &mut impl Extend<Output>, res: PersistRes) -> State {
let payload = Payload::AppendEntriesRes(AppendEntriesRes {
term: self.shared().current_term,
success: true,
index: res.log_index,
read_id: res.read_id,
});
let msg = Message { src: self.id(), dest: res.leader_id, payload: payload };
if msg.src == msg.dest {
return State::step(self, output, Input::Message(msg));
}
output.extend(vec![Output::Message(msg)]);
self
}
fn read_state_machine_res(
self,
output: &mut impl Extend<Output>,
res: ReadStateMachineRes,
) -> State {
let mut leader = match self {
State::Leader(leader) => leader,
State::Candidate(candidate) => return State::Candidate(candidate),
State::Follower(follower) => return State::Follower(follower),
};
let index = res.index;
let payload = res.payload;
if let Some((_, mut res)) = leader.read_buffer.remove(&(res.index, res.read_id)) {
#[cfg(feature = "log")]
match std::str::from_utf8(&payload) {
Ok(payload) => {
debug!(" {:3}: read success {:?}", leader.shared.id.0, payload);
}
Err(_) => {
debug!(" {:3}: read success {:?}", leader.shared.id.0, payload);
}
}
res.fill(Ok(ReadRes { term: leader.shared.current_term, index: index, payload: payload }));
}
leader = State::leader_maybe_apply(leader, output);
State::Leader(leader)
}
fn leader_maybe_apply(mut leader: Leader, output: &mut impl Extend<Output>) -> Leader {
let min_outstanding_read: Option<Index> =
leader.read_buffer.iter().next().map(|((index, _), _)| *index);
State::maybe_apply(&mut leader.shared, output, min_outstanding_read);
leader
}
fn follower_maybe_apply(mut follower: Follower, output: &mut impl Extend<Output>) -> Follower {
State::maybe_apply(&mut follower.shared, output, None);
follower
}
fn message(mut self, output: &mut impl Extend<Output>, message: Message) -> State {
{
let mut shared = self.shared_mut();
let term = match &message.payload {
Payload::AppendEntriesReq(req) => req.term,
Payload::AppendEntriesRes(res) => res.term,
Payload::RequestVoteReq(req) => req.term,
Payload::RequestVoteRes(res) => res.term,
Payload::StartElectionReq(req) => req.term,
};
if term > shared.current_term {
shared.current_term = term;
shared.voted_for = None;
self = State::Follower(self.convert_to_follower(output, message.src));
}
}
match self {
State::Candidate(candidate) => State::candidate_step(candidate, output, message),
State::Follower(follower) => State::follower_step(follower, output, message),
State::Leader(leader) => State::leader_step(leader, output, message),
}
}
fn candidate_step(
candidate: Candidate,
output: &mut impl Extend<Output>,
message: Message,
) -> State {
match &message.payload {
Payload::RequestVoteRes(res) => {
State::candidate_process_request_vote_res(candidate, output, &res)
}
Payload::AppendEntriesReq(req) => {
if req.term > candidate.shared.current_term {
let follower = State::candidate_convert_to_follower(candidate, output, message.src);
return State::Follower(follower).step(output, Input::Message(message));
}
State::Candidate(candidate)
}
Payload::RequestVoteReq(req) => State::Candidate(candidate).process_request_vote(output, req),
Payload::StartElectionReq(req) => {
if req.term < candidate.shared.current_term {
return State::Candidate(candidate);
}
State::start_election(candidate, output)
}
payload => todo!("{:?}", payload),
}
}
fn follower_step(
follower: Follower,
output: &mut impl Extend<Output>,
message: Message,
) -> State {
match message.payload {
Payload::AppendEntriesReq(req) => {
State::Follower(State::follower_append_entries(follower, output, req))
}
Payload::RequestVoteReq(req) => State::Follower(follower).process_request_vote(output, &req),
Payload::AppendEntriesRes(_) => {
State::Follower(follower)
}
Payload::StartElectionReq(req) => {
if req.term < follower.shared.current_term {
return State::Follower(follower);
}
State::follower_convert_to_candidate(follower, output)
}
Payload::RequestVoteRes(_) => {
State::Follower(follower)
}
}
}
fn leader_step(leader: Leader, output: &mut impl Extend<Output>, message: Message) -> State {
match message.payload {
Payload::AppendEntriesRes(res) => {
State::Leader(State::leader_append_entries_res(leader, output, message.src, res))
}
Payload::RequestVoteRes(_) => {
State::Leader(leader)
}
Payload::StartElectionReq(_) => {
State::Leader(leader)
}
payload => todo!("{:?}", payload),
}
}
fn follower_append_entries(
mut follower: Follower,
output: &mut impl Extend<Output>,
req: AppendEntriesReq,
) -> Follower {
if req.term < follower.shared.current_term {
let payload = Payload::AppendEntriesRes(AppendEntriesRes {
term: follower.shared.current_term,
index: Index(0),
success: false,
read_id: req.read_id,
});
let msg =
Output::Message(Message { src: follower.shared.id, dest: req.leader_id, payload: payload });
output.extend(vec![msg]);
return follower;
}
debug!(
" {:3}: self.last_communication={:?}",
follower.shared.id.0, follower.shared.current_time
);
follower.shared.last_communication = follower.shared.current_time;
let log_match = follower
.shared
.log
.index_term(req.prev_log_index)
.map_or(false, |term| term == req.prev_log_term);
if !log_match {
let payload = Payload::AppendEntriesRes(AppendEntriesRes {
term: follower.shared.current_term,
index: Index(0),
success: false,
read_id: req.read_id,
});
let msg =
Output::Message(Message { src: follower.shared.id, dest: req.leader_id, payload: payload });
output.extend(vec![msg]);
return follower;
}
if req.entries.len() > 0 {
follower.shared.log.extend(&req.entries);
let msg = PersistReq { leader_id: req.leader_id, read_id: req.read_id, entries: req.entries };
output.extend(vec![Output::PersistReq(msg)]);
} else {
let payload = Payload::AppendEntriesRes(AppendEntriesRes {
term: follower.shared.current_term,
success: true,
index: req.prev_log_index, read_id: req.read_id,
});
let msg = Message { src: follower.shared.id, dest: req.leader_id, payload: payload };
output.extend(vec![Output::Message(msg)]);
}
if req.leader_commit > follower.shared.commit_index {
let last_entry_index = follower.shared.log.last().1;
follower.shared.commit_index = cmp::min(req.leader_commit, last_entry_index);
follower = State::follower_maybe_apply(follower, output);
}
follower
}
fn leader_append_entries_res(
leader: Leader,
output: &mut impl Extend<Output>,
src: NodeID,
res: AppendEntriesRes,
) -> Leader {
if res.success {
return State::ack_term_index(leader, output, src, res.index, res.read_id);
}
todo!()
}
fn ack_term_index(
mut leader: Leader,
output: &mut impl Extend<Output>,
src: NodeID,
index: Index,
read_id: ReadID,
) -> Leader {
debug!(" {:3}: self.ack_term_index src={:?} index={:}", leader.shared.id.0, src, index.0);
leader
.match_index
.entry(src)
.and_modify(|index_read_id| *index_read_id = cmp::max(*index_read_id, (index, read_id)))
.or_insert((index, read_id));
let mut read_ids: Vec<ReadID> =
leader.match_index.iter().map(|(_, (_, read_id))| *read_id).collect();
read_ids.sort_unstable();
debug!(" {:3}: read_ids={:?}", leader.shared.id.0, &read_ids);
if read_ids.len() >= State::majority(&leader.shared) {
let new_max_confirmed_read_id =
read_ids.get(read_ids.len() - State::majority(&leader.shared)).copied();
debug!(
" {:3}: read_ids={:?} new_confirmed={:?}",
leader.shared.id.0, &read_ids, new_max_confirmed_read_id
);
debug_assert!(
new_max_confirmed_read_id >= leader.max_confirmed_read_id,
"{:?} vs {:?}",
new_max_confirmed_read_id,
leader.max_confirmed_read_id
);
leader.max_confirmed_read_id = new_max_confirmed_read_id;
debug!(
" {:3}: outstanding={:?} confirmed={:?}",
leader.shared.id.0, leader.max_outstanding_read_id, leader.max_confirmed_read_id
);
if new_max_confirmed_read_id >= leader.max_outstanding_read_id {
debug!(" {:3}: no outstanding reads", leader.shared.id.0);
leader.max_outstanding_read_id = None;
}
leader = State::leader_maybe_advance_reads(leader, output);
}
debug!(
" {:3}: match_indexes={:?}",
leader.shared.id.0,
leader.match_index.iter().map(|(_, (index, _))| *index).collect::<Vec<_>>(),
);
let needed = State::majority(&leader.shared);
for (_, entry_index) in leader.shared.log.iter().rev() {
debug!(
" {:3}: is committed? term={:} index={:} current_term={:} commit_index={:}",
leader.shared.id.0,
entry_term.0,
entry_index.0,
leader.shared.current_term.0,
leader.shared.commit_index.0,
);
if entry_index <= leader.shared.commit_index {
break;
}
let count = leader.match_index.iter().filter(|(_, (index, _))| *index >= entry_index).count();
if count >= needed {
let new_commit_index = entry_index;
debug!(" {:3}: new_commit_index={:?}", leader.shared.id.0, new_commit_index);
leader.shared.commit_index = new_commit_index;
leader = State::leader_maybe_apply(leader, output);
leader = State::maybe_wake_writes(leader);
leader = State::leader_maybe_advance_reads(leader, output);
break;
}
}
leader
}
fn process_request_vote(
mut self,
output: &mut impl Extend<Output>,
req: &RequestVoteReq,
) -> State {
let mut shared = self.shared_mut();
debug!(
" {:3}: self.process_request_vote voted_for={:?} req={:?}",
shared.id.0, shared.voted_for, req
);
if req.term < shared.current_term {
let payload =
Payload::RequestVoteRes(RequestVoteRes { term: shared.current_term, vote_granted: false });
let msg =
Output::Message(Message { src: self.id(), dest: req.candidate_id, payload: payload });
output.extend(vec![msg]);
return self;
}
let should_grant = match shared.voted_for {
None => true,
Some(voted_for) => voted_for == req.candidate_id,
};
if should_grant {
shared.voted_for = Some(req.candidate_id);
let payload =
Payload::RequestVoteRes(RequestVoteRes { term: shared.current_term, vote_granted: true });
let msg =
Output::Message(Message { src: shared.id, dest: req.candidate_id, payload: payload });
output.extend(vec![msg]);
}
self
}
fn candidate_process_request_vote_res(
mut candidate: Candidate,
output: &mut impl Extend<Output>,
res: &RequestVoteRes,
) -> State {
if res.vote_granted {
candidate.received_votes += 1;
let needed_votes = State::majority(&candidate.shared);
if candidate.received_votes >= needed_votes {
return State::Leader(State::candidate_convert_to_leader(candidate, output));
}
}
return State::Candidate(candidate);
}
fn start_election(mut candidate: Candidate, output: &mut impl Extend<Output>) -> State {
debug!(" {:3}: start_election {:?}", candidate.shared.id.0, candidate.shared.current_time);
candidate.received_votes = 0;
candidate.shared.voted_for = Some(candidate.shared.id);
candidate.shared.current_term = Term(candidate.shared.current_term.0 + 1);
candidate.shared.last_communication = candidate.shared.current_time;
let (last_log_term, last_log_index) = candidate.shared.log.last();
let payload = Payload::RequestVoteReq(RequestVoteReq {
term: candidate.shared.current_term,
candidate_id: candidate.shared.id,
last_log_index: last_log_index,
last_log_term: last_log_term,
});
debug!(" {:3}: reqvote {:?}", candidate.shared.id.0, payload);
State::message_to_all_other_nodes(&candidate.shared, output, &payload);
let res = RequestVoteRes { term: candidate.shared.current_term, vote_granted: true };
return State::candidate_process_request_vote_res(candidate, output, &res);
}
fn message_to_all_other_nodes(
shared: &SharedState,
output: &mut impl Extend<Output>,
payload: &Payload,
) {
output.extend(shared.peers.iter().filter(|peer| **peer != shared.id).map(|node| {
Output::Message(Message { src: shared.id, dest: *node, payload: payload.clone() })
}))
}
fn follower_convert_to_candidate(follower: Follower, output: &mut impl Extend<Output>) -> State {
debug!(" {:3}: convert_to_candidate", follower.shared.id.0);
let candidate = Candidate { shared: follower.shared, received_votes: 0 };
State::start_election(candidate, output)
}
fn convert_to_follower(
self,
output: &mut impl Extend<Output>,
new_leader_hint: NodeID,
) -> Follower {
match self {
State::Candidate(candidate) => {
State::candidate_convert_to_follower(candidate, output, new_leader_hint)
}
State::Leader(leader) => State::leader_convert_to_follower(leader, output, new_leader_hint),
State::Follower(follower) => {
follower
}
}
}
fn candidate_convert_to_follower(
candidate: Candidate,
_output: &mut impl Extend<Output>,
new_leader_hint: NodeID,
) -> Follower {
debug!(" {:3}: convert_to_follower leader={:?}", candidate.shared.id.0, new_leader_hint.0);
Follower { shared: candidate.shared, leader_hint: new_leader_hint }
}
fn leader_convert_to_follower(
mut leader: Leader,
_output: &mut impl Extend<Output>,
new_leader_hint: NodeID,
) -> Follower {
debug!(" {:3}: convert_to_follower leader={:?}", leader.shared.id.0, new_leader_hint.0);
leader = State::clear_outstanding_requests(leader, Some(new_leader_hint));
Follower { shared: leader.shared, leader_hint: new_leader_hint }
}
fn candidate_convert_to_leader(candidate: Candidate, output: &mut impl Extend<Output>) -> Leader {
debug!(" {:3}: convert_to_leader", candidate.shared.id.0);
let leader = Leader {
shared: candidate.shared,
next_read_id: ReadID(0),
_next_index: HashMap::new(),
match_index: HashMap::new(),
write_buffer: HashMap::new(),
max_outstanding_read_id: None,
max_confirmed_read_id: None,
read_buffer: BTreeMap::new(),
};
State::leader_heartbeat(leader, output)
}
fn clear_outstanding_requests(mut leader: Leader, new_leader_hint: Option<NodeID>) -> Leader {
leader.write_buffer.drain().for_each(|(_, mut future)| {
future.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(new_leader_hint))));
});
leader.read_buffer.iter_mut().for_each(|(_, (_, future))| {
future.fill(Err(ClientError::NotLeaderError(NotLeaderError::new(new_leader_hint))));
});
leader.read_buffer.clear();
leader
}
fn shutdown(self) {
match self {
State::Follower(_) | State::Candidate(_) => {} State::Leader(leader) => {
let _ = State::clear_outstanding_requests(leader, None);
}
}
}
fn majority(shared: &SharedState) -> usize {
(shared.peers.len() + 1) / 2
}
}
impl Drop for Raft {
fn drop(&mut self) {
self.shutdown()
}
}
#[cfg(test)]
#[path = "raft_tests.rs"]
mod tests;