use futures::{Async, Future};
use std::collections::BTreeMap;
use std::mem;
use trackable::error::ErrorKindExt;
use super::super::Common;
use cluster::ClusterConfig;
use log::{Log, LogIndex};
use message::{AppendEntriesReply, SequenceNumber};
use node::NodeId;
use {ErrorKind, Io, Result};
pub struct FollowersManager<IO: Io> {
followers: BTreeMap<NodeId, Follower>,
config: ClusterConfig,
latest_hearbeat_ack: SequenceNumber,
last_broadcast_seq_no: SequenceNumber,
tasks: BTreeMap<NodeId, IO::LoadLog>,
}
impl<IO: Io> FollowersManager<IO> {
pub fn new(config: ClusterConfig) -> Self {
let followers = config
.members()
.map(|n| (n.clone(), Follower::new()))
.collect();
FollowersManager {
followers,
config,
tasks: BTreeMap::new(),
latest_hearbeat_ack: SequenceNumber::new(0),
last_broadcast_seq_no: SequenceNumber::new(0),
}
}
pub fn run_once(&mut self, common: &mut Common<IO>) -> Result<()> {
let mut dones = Vec::new();
for (follower, task) in &mut self.tasks {
if let Async::Ready(log) = track!(task.poll())? {
dones.push((follower.clone(), log));
}
}
for (follower, log) in dones {
let rpc = common.rpc_caller();
match log {
Log::Prefix(snapshot) => rpc.send_install_snapshot(&follower, snapshot),
Log::Suffix(slice) => rpc.send_append_entries(&follower, slice),
}
self.tasks.remove(&follower);
}
Ok(())
}
pub fn latest_hearbeat_ack(&self) -> SequenceNumber {
self.latest_hearbeat_ack
}
pub fn committed_log_tail(&self) -> LogIndex {
self.config.consensus_value(|node_id| {
let f = &self.followers[node_id];
if f.synced {
f.log_tail
} else {
LogIndex::new(0)
}
})
}
pub fn joint_committed_log_tail(&self) -> LogIndex {
self.config.full_consensus_value(|node_id| {
let f = &self.followers[node_id];
if f.synced {
f.log_tail
} else {
LogIndex::new(0)
}
})
}
pub fn handle_append_entries_reply(
&mut self,
common: &Common<IO>,
reply: &AppendEntriesReply,
) -> bool {
let updated = self.update_follower_state(common, reply);
if self.latest_hearbeat_ack < reply.header.seq_no {
self.latest_hearbeat_ack = self
.config
.consensus_value(|node_id| self.followers[node_id].last_seq_no);
}
updated
}
pub fn set_last_broadcast_seq_no(&mut self, seq_no: SequenceNumber) {
self.last_broadcast_seq_no = seq_no;
}
pub fn log_sync(&mut self, common: &mut Common<IO>, reply: &AppendEntriesReply) -> Result<()> {
if reply.busy || self.tasks.contains_key(&reply.header.sender) {
return Ok(());
}
let follower = track!(self
.followers
.get_mut(&reply.header.sender)
.ok_or_else(|| ErrorKind::InconsistentState.error()))?;
if reply.header.seq_no <= follower.obsolete_seq_no {
return Ok(());
}
follower.obsolete_seq_no = self.last_broadcast_seq_no;
if common.log().tail().index <= follower.log_tail {
return Ok(());
}
let end = if follower.synced {
common.log().tail().index
} else {
follower.log_tail
};
let future = common.load_log(follower.log_tail, Some(end));
self.tasks.insert(reply.header.sender.clone(), future);
Ok(())
}
pub fn handle_config_updated(&mut self, config: &ClusterConfig) {
for id in config.members() {
if !self.followers.contains_key(id) {
self.followers.insert(id.clone(), Follower::new());
}
}
self.followers = mem::replace(&mut self.followers, BTreeMap::new())
.into_iter()
.filter(|&(ref id, _)| config.is_known_node(id))
.collect();
self.config = config.clone();
}
fn update_follower_state(&mut self, common: &Common<IO>, reply: &AppendEntriesReply) -> bool {
let follower = &mut self
.followers
.get_mut(&reply.header.sender)
.expect("Never fails");
if follower.last_seq_no < reply.header.seq_no {
follower.last_seq_no = reply.header.seq_no;
}
match *reply {
AppendEntriesReply { busy: true, .. } => false,
AppendEntriesReply { log_tail, .. } if follower.synced => {
let updated = follower.log_tail < log_tail.index;
if updated {
follower.log_tail = log_tail.index;
} else if log_tail.index.as_u64() == 0 && follower.log_tail.as_u64() != 0 {
follower.synced = false;
}
updated
}
AppendEntriesReply { log_tail, .. } => {
let leader_term = common
.log()
.get_record(log_tail.index)
.map(|r| r.head.prev_term);
follower.synced = leader_term == Some(log_tail.prev_term);
if follower.synced {
follower.log_tail = log_tail.index;
} else {
follower.log_tail = log_tail.index.as_u64().saturating_sub(1).into();
}
follower.synced
}
}
}
}
#[derive(Debug)]
struct Follower {
pub obsolete_seq_no: SequenceNumber,
pub log_tail: LogIndex,
pub last_seq_no: SequenceNumber,
pub synced: bool,
}
impl Follower {
pub fn new() -> Self {
Follower {
obsolete_seq_no: SequenceNumber::new(0),
log_tail: LogIndex::new(0),
last_seq_no: SequenceNumber::new(0),
synced: false,
}
}
}