use futures::{Async, Future};
use super::super::{Common, NextState, RoleState};
use super::{Follower, FollowerIdle};
use log::LogPosition;
use message::{AppendEntriesCall, Message};
use {Io, Result};
pub struct FollowerAppend<IO: Io> {
future: Option<IO::SaveLog>,
new_log_tail: LogPosition,
message: AppendEntriesCall,
}
impl<IO: Io> FollowerAppend<IO> {
pub fn new(common: &mut Common<IO>, mut message: AppendEntriesCall) -> Self {
let mut new_log_tail = message.suffix.tail();
if new_log_tail.index < common.log().tail().index {
new_log_tail = common.log().tail();
}
if message.suffix.tail().index < message.committed_log_tail {
message.committed_log_tail = message.suffix.tail().index;
}
if message.committed_log_tail < common.log_committed_tail().index {
message.committed_log_tail = common.log_committed_tail().index;
}
let future = if new_log_tail.index == common.log().tail().index {
None
} else {
Some(common.save_log_suffix(&message.suffix))
};
FollowerAppend {
future,
new_log_tail,
message,
}
}
pub fn handle_message(
&mut self,
common: &mut Common<IO>,
message: Message,
) -> Result<NextState<IO>> {
if let Message::AppendEntriesCall(m) = message {
common.rpc_callee(&m.header).reply_busy();
}
Ok(None)
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<NextState<IO>> {
if let Async::Ready(_) = track!(self.future.poll())? {
if self.new_log_tail == self.message.suffix.tail() {
track!(common.handle_log_appended(&self.message.suffix))?;
}
track!(common.handle_log_committed(self.message.committed_log_tail))?;
common
.rpc_callee(&self.message.header)
.reply_append_entries(self.message.suffix.tail());
let next = Follower::Idle(FollowerIdle::new());
Ok(Some(RoleState::Follower(next)))
} else {
Ok(None)
}
}
}