use self::appender::LogAppender;
use self::follower::FollowersManager;
use super::{Common, NextState};
use election::Role;
use log::{LogEntry, LogIndex, LogSuffix, ProposalId};
use message::{Message, SequenceNumber};
use {ErrorKind, Io, Result};
mod appender;
mod follower;
pub struct Leader<IO: Io> {
followers: FollowersManager<IO>,
appender: LogAppender<IO>,
commit_lower_bound: LogIndex,
}
impl<IO: Io> Leader<IO> {
pub fn new(common: &mut Common<IO>) -> Self {
common.set_timeout(Role::Leader);
let term_start_index = common.log().tail().index;
let followers = FollowersManager::new(common.config().clone());
let mut appender = LogAppender::new();
let noop = LogEntry::Noop {
term: common.term(),
};
appender.append(common, vec![noop]);
Leader {
followers,
appender,
commit_lower_bound: term_start_index,
}
}
pub fn handle_timeout(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
self.broadcast_empty_entries(common);
Ok(None)
}
pub fn handle_message(
&mut self,
common: &mut Common<IO>,
message: Message,
) -> Result<NextState<IO>> {
if let Message::AppendEntriesReply(reply) = message {
let updated = self.followers.handle_append_entries_reply(&common, &reply);
track!(self.followers.log_sync(common, &reply))?;
if updated {
track!(self.handle_committed_log(common))?;
}
}
Ok(None)
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
while let Some(appended) = track!(self.appender.run_once(common))? {
for e in &appended.entries {
if let LogEntry::Config { ref config, .. } = *e {
self.followers.handle_config_updated(config);
if self.commit_lower_bound < common.log().committed_tail().index {
self.commit_lower_bound = common.log().committed_tail().index;
}
}
}
self.broadcast_slice(common, appended);
}
track!(self.handle_change_config(common))?;
track!(self.followers.run_once(common))?;
Ok(None)
}
pub fn propose(&mut self, common: &mut Common<IO>, entry: LogEntry) -> ProposalId {
let proposal_id = self.next_proposal_id(common);
self.appender.append(common, vec![entry]);
proposal_id
}
pub fn heartbeat_syn(&mut self, common: &mut Common<IO>) -> SequenceNumber {
let seq_no = common.next_seq_no();
self.broadcast_empty_entries(common);
seq_no
}
pub fn proposal_queue_len(&self, common: &Common<IO>) -> usize {
self.appender.unappended_log_tail(common) - common.log().tail().index
}
pub fn last_heartbeat_ack(&self) -> SequenceNumber {
self.followers.latest_hearbeat_ack()
}
fn handle_change_config(&mut self, common: &mut Common<IO>) -> Result<()> {
if common.config().state().is_stable() {
return Ok(());
}
if self.appender.is_busy() {
return Ok(());
}
let committed = self.followers.committed_log_tail();
if committed < common.log().last_record().head.index {
return Ok(());
}
let joint_committed = self.followers.joint_committed_log_tail();
if joint_committed == committed {
let term = common.term();
let config = common.config().to_next_state();
let entry = LogEntry::Config { term, config };
self.propose(common, entry);
}
Ok(())
}
fn next_proposal_id(&self, common: &Common<IO>) -> ProposalId {
let term = common.term();
let index = self.appender.unappended_log_tail(common);
ProposalId { term, index }
}
fn broadcast_slice(&mut self, common: &mut Common<IO>, slice: LogSuffix) {
self.followers
.set_last_broadcast_seq_no(common.next_seq_no());
common.set_timeout(Role::Leader);
common.rpc_caller().broadcast_append_entries(slice);
}
fn broadcast_empty_entries(&mut self, common: &mut Common<IO>) {
let head = common.log().tail();
let entries = Vec::new();
let slice = LogSuffix { head, entries };
self.broadcast_slice(common, slice);
}
fn handle_committed_log(&mut self, common: &mut Common<IO>) -> Result<()> {
let committed = self.followers.committed_log_tail();
if committed < self.commit_lower_bound {
return Ok(());
}
let old = common.log().committed_tail();
if old.index == committed {
return Ok(());
}
track_assert!(
old.index < committed,
ErrorKind::InconsistentState,
"old={:?}, committed={:?}",
old,
committed
);
track!(common.handle_log_committed(committed))?;
Ok(())
}
}