use std::marker::PhantomData;
use trackable::error::ErrorKindExt;
use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerAppend, FollowerSnapshot};
use log::{LogPosition, LogSuffix};
use message::{AppendEntriesCall, Message};
use {ErrorKind, Io, Result};
pub struct FollowerIdle<IO: Io> {
_phantom: PhantomData<IO>,
}
impl<IO: Io> FollowerIdle<IO> {
pub fn new() -> Self {
FollowerIdle {
_phantom: PhantomData,
}
}
#[allow(clippy::if_same_then_else)]
pub fn handle_message(
&mut self,
common: &mut Common<IO>,
message: Message,
) -> Result<NextState<IO>> {
match message {
Message::AppendEntriesCall(m) => track!(self.handle_entries(common, m)),
Message::InstallSnapshotCast(m) => {
if m.prefix.tail.index <= common.log_committed_tail().index {
Ok(None)
} else if common.is_snapshot_installing() {
Ok(None)
} else {
track!(common.install_snapshot(m.prefix))?;
let next = FollowerSnapshot::new();
Ok(Some(RoleState::Follower(Follower::Snapshot(next))))
}
}
_ => Ok(None),
}
}
fn handle_entries(
&mut self,
common: &mut Common<IO>,
mut message: AppendEntriesCall,
) -> Result<NextState<IO>> {
let local_tail = common.log().tail();
if message.suffix.tail().index < common.log().head().index {
common
.rpc_callee(&message.header)
.reply_append_entries(local_tail); return Ok(None);
}
if message.suffix.head.index < common.log().head().index {
track!(message.suffix.skip_to(common.log().head().index))?;
}
if local_tail.index < message.suffix.head.index {
common
.rpc_callee(&message.header)
.reply_append_entries(local_tail);
Ok(None)
} else {
track!(self.handle_non_disjoint_entries(common, message))
}
}
fn handle_non_disjoint_entries(
&mut self,
common: &mut Common<IO>,
mut message: AppendEntriesCall,
) -> Result<NextState<IO>> {
let (matched, lcp) = track!(self.longest_common_prefix(common, &message.suffix))?;
if !matched {
let new_log_tail = lcp;
track!(common.handle_log_rollbacked(new_log_tail))?;
common
.rpc_callee(&message.header)
.reply_append_entries(new_log_tail);
Ok(None)
} else {
track!(message.suffix.skip_to(lcp.index))?;
let next = FollowerAppend::new(common, message);
Ok(Some(RoleState::Follower(Follower::Append(next))))
}
}
fn longest_common_prefix(
&self,
common: &Common<IO>,
suffix: &LogSuffix,
) -> Result<(bool, LogPosition)> {
for LogPosition { prev_term, index } in suffix.positions() {
let record = track!(common
.log()
.get_record(index)
.ok_or_else(|| ErrorKind::InconsistentState.error()))?;
let local_prev_term = record.head.prev_term;
if prev_term != local_prev_term {
let mut lcp = track!(common
.log()
.get_record(index - 1)
.ok_or_else(|| ErrorKind::InconsistentState.error()))?
.head;
lcp.index = index - 1;
return Ok((false, lcp));
}
if index == common.log().tail().index {
return Ok((true, common.log().tail()));
}
}
Ok((true, suffix.tail()))
}
}