use futures::{Async, Future, Poll};
use std::collections::VecDeque;
use self::rpc_builder::{RpcCallee, RpcCaller};
use super::candidate::Candidate;
use super::follower::Follower;
use super::leader::Leader;
use super::{NextState, RoleState};
use cluster::ClusterConfig;
use election::{Ballot, Role, Term};
use log::{Log, LogHistory, LogIndex, LogPosition, LogPrefix, LogSuffix};
use message::{Message, MessageHeader, SequenceNumber};
use metrics::NodeStateMetrics;
use node::{Node, NodeId};
use {Error, ErrorKind, Event, Io, Result};
mod rpc_builder;
pub struct Common<IO: Io> {
local_node: Node,
history: LogHistory,
timeout: IO::Timeout,
events: VecDeque<Event>,
io: IO,
unread_message: Option<Message>,
seq_no: SequenceNumber,
load_committed: Option<IO::LoadLog>,
install_snapshot: Option<InstallSnapshot<IO>>,
metrics: NodeStateMetrics,
}
impl<IO> Common<IO>
where
IO: Io,
{
pub fn new(
node_id: NodeId,
mut io: IO,
config: ClusterConfig,
metrics: NodeStateMetrics,
) -> Self {
let timeout = io.create_timeout(Role::Follower);
Common {
local_node: Node::new(node_id),
io,
history: LogHistory::new(config),
unread_message: None,
seq_no: SequenceNumber::new(0),
timeout,
events: VecDeque::new(),
load_committed: None,
install_snapshot: None,
metrics,
}
}
pub fn config(&self) -> &ClusterConfig {
self.history.config()
}
pub fn log(&self) -> &LogHistory {
&self.history
}
pub fn log_committed_tail(&self) -> LogPosition {
self.history.committed_tail()
}
pub fn term(&self) -> Term {
self.local_node.ballot.term
}
pub fn local_node(&self) -> &Node {
&self.local_node
}
pub fn handle_log_appended(&mut self, suffix: &LogSuffix) -> Result<()> {
track!(self.history.record_appended(suffix))
}
pub fn handle_log_committed(&mut self, new_tail: LogIndex) -> Result<()> {
track!(self.history.record_committed(new_tail))
}
pub fn handle_log_rollbacked(&mut self, new_tail: LogPosition) -> Result<()> {
track!(self.history.record_rollback(new_tail))
}
pub fn handle_log_snapshot_installed(
&mut self,
new_head: LogPosition,
config: ClusterConfig,
) -> Result<()> {
track!(self.history.record_snapshot_installed(new_head, config))
}
pub fn handle_log_snapshot_loaded(&mut self, prefix: LogPrefix) -> Result<()> {
if self.history.committed_tail().index < prefix.tail.index {
track!(self
.history
.record_snapshot_installed(prefix.tail, prefix.config.clone(),))?;
}
track!(self.history.record_snapshot_loaded(&prefix))?;
let event = Event::SnapshotLoaded {
new_head: prefix.tail,
snapshot: prefix.snapshot,
};
self.metrics.event_queue_len.increment();
self.events.push_back(event);
Ok(())
}
pub fn set_ballot(&mut self, new_ballot: Ballot) {
if self.local_node.ballot != new_ballot {
self.local_node.ballot = new_ballot.clone();
self.metrics.event_queue_len.increment();
self.events.push_back(Event::TermChanged { new_ballot });
}
}
pub fn is_snapshot_installing(&self) -> bool {
self.install_snapshot.is_some()
}
pub fn is_focusing_on_installing_snapshot(&self) -> bool {
if let Some(ref snapshot) = self.install_snapshot {
return self.log().tail().index < snapshot.summary.tail.index;
}
false
}
pub fn transit_to_leader(&mut self) -> RoleState<IO> {
self.metrics.transit_to_leader_total.increment();
self.set_role(Role::Leader);
self.notify_new_leader_elected();
RoleState::Leader(Leader::new(self))
}
pub fn transit_to_candidate(&mut self) -> RoleState<IO> {
self.metrics.transit_to_candidate_total.increment();
let new_ballot = Ballot {
term: (self.local_node.ballot.term.as_u64() + 1).into(),
voted_for: self.local_node.id.clone(),
};
self.set_ballot(new_ballot);
self.set_role(Role::Candidate);
RoleState::Candidate(Candidate::new(self))
}
pub fn transit_to_follower(
&mut self,
followee: NodeId,
pending_vote: Option<MessageHeader>,
) -> RoleState<IO> {
self.metrics.transit_to_follower_total.increment();
let new_ballot = Ballot {
term: self.local_node.ballot.term,
voted_for: followee,
};
self.set_ballot(new_ballot);
self.set_role(Role::Follower);
self.notify_new_leader_elected();
RoleState::Follower(Follower::new(self, pending_vote))
}
pub fn notify_new_leader_elected(&mut self) {
self.events.push_back(Event::NewLeaderElected);
}
pub fn next_seq_no(&self) -> SequenceNumber {
self.seq_no
}
pub fn io(&self) -> &IO {
&self.io
}
pub unsafe fn io_mut(&mut self) -> &mut IO {
&mut self.io
}
pub fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> IO::LoadLog {
self.io.load_log(start, end)
}
pub fn save_log_suffix(&mut self, suffix: &LogSuffix) -> IO::SaveLog {
self.io.save_log_suffix(suffix)
}
pub fn save_ballot(&mut self) -> IO::SaveBallot {
self.io.save_ballot(self.local_node.ballot.clone())
}
pub fn load_ballot(&mut self) -> IO::LoadBallot {
self.io.load_ballot()
}
pub fn set_timeout(&mut self, role: Role) {
self.timeout = self.io.create_timeout(role);
}
pub fn poll_timeout(&mut self) -> Result<Async<()>> {
track!(self.timeout.poll())
}
pub fn next_event(&mut self) -> Option<Event> {
self.metrics.event_queue_len.decrement();
self.events.pop_front()
}
pub fn try_recv_message(&mut self) -> Result<Option<Message>> {
if let Some(message) = self.unread_message.take() {
Ok(Some(message))
} else {
track!(self.io.try_recv_message())
}
}
pub fn install_snapshot(&mut self, snapshot: LogPrefix) -> Result<()> {
track_assert!(
self.history.head().index <= snapshot.tail.index,
ErrorKind::InconsistentState
);
track_assert!(self.install_snapshot.is_none(), ErrorKind::Busy);
let future = InstallSnapshot::new(self, snapshot);
self.install_snapshot = Some(future);
Ok(())
}
pub fn handle_message(&mut self, message: Message) -> HandleMessageResult<IO> {
if self.local_node.role == Role::Leader
&& !self.config().is_known_node(&message.header().sender)
{
HandleMessageResult::Handled(None)
} else if message.header().term > self.local_node.ballot.term {
let is_follower = self.local_node.ballot.voted_for != self.local_node.id;
if is_follower && self.local_node.ballot.voted_for != message.header().sender {
return HandleMessageResult::Handled(None);
}
self.local_node.ballot.term = message.header().term;
let next_state = if let Message::RequestVoteCall(m) = message {
if m.log_tail.is_newer_or_equal_than(self.history.tail()) {
let candidate = m.header.sender.clone();
self.transit_to_follower(candidate, Some(m.header))
} else {
self.transit_to_candidate()
}
} else if let Message::AppendEntriesCall { .. } = message {
let leader = message.header().sender.clone();
self.unread_message = Some(message);
self.transit_to_follower(leader, None)
} else if self.local_node.role == Role::Leader {
self.transit_to_candidate()
} else {
let local = self.local_node.id.clone();
self.transit_to_follower(local, None)
};
HandleMessageResult::Handled(Some(next_state))
} else if message.header().term < self.local_node.ballot.term {
self.rpc_callee(message.header()).reply_request_vote(false);
HandleMessageResult::Handled(None)
} else {
match message {
Message::RequestVoteCall { .. } if !self.is_following_sender(&message) => {
self.rpc_callee(message.header()).reply_request_vote(false);
HandleMessageResult::Handled(None)
}
Message::AppendEntriesCall { .. } if !self.is_following_sender(&message) => {
let leader = message.header().sender.clone();
self.unread_message = Some(message);
let next = self.transit_to_follower(leader, None);
HandleMessageResult::Handled(Some(next))
}
_ => HandleMessageResult::Unhandled(message), }
}
}
pub fn run_once(&mut self) -> Result<NextState<IO>> {
loop {
if let Async::Ready(Some(summary)) = track!(self.install_snapshot.poll())? {
let SnapshotSummary {
tail: new_head,
config,
} = summary;
self.install_snapshot = None;
self.events.push_back(Event::SnapshotInstalled { new_head });
track!(self.history.record_snapshot_installed(new_head, config))?;
}
if let Async::Ready(Some(log)) = track!(self.load_committed.poll())? {
self.load_committed = None;
match log {
Log::Prefix(snapshot) => track!(self.handle_log_snapshot_loaded(snapshot))?,
Log::Suffix(slice) => track!(self.handle_committed(slice))?,
}
}
if self.load_committed.is_some()
|| self.history.consumed_tail().index == self.history.committed_tail().index
{
break;
}
let start = self.history.consumed_tail().index;
let end = self.history.committed_tail().index;
self.load_committed = Some(self.load_log(start, Some(end)));
}
Ok(None)
}
pub fn rpc_caller(&mut self) -> RpcCaller<IO> {
RpcCaller::new(self)
}
pub fn rpc_callee<'a>(&'a mut self, caller: &'a MessageHeader) -> RpcCallee<IO> {
RpcCallee::new(self, caller)
}
fn handle_committed(&mut self, suffix: LogSuffix) -> Result<()> {
let new_tail = suffix.tail();
for (index, entry) in (suffix.head.index.as_u64()..)
.map(LogIndex::new)
.zip(suffix.entries.into_iter())
{
let event = Event::Committed { index, entry };
self.events.push_back(event);
}
if new_tail.index >= self.log().head().index {
track!(self.history.record_consumed(new_tail.index))?;
}
Ok(())
}
fn set_role(&mut self, new_role: Role) {
if self.local_node.role != new_role {
self.local_node.role = new_role;
self.events.push_back(Event::RoleChanged { new_role });
}
}
fn is_following_sender(&self, message: &Message) -> bool {
self.local_node.ballot.voted_for == message.header().sender
}
}
pub enum HandleMessageResult<IO: Io> {
Handled(Option<RoleState<IO>>),
Unhandled(Message),
}
#[derive(Debug, Clone)]
struct SnapshotSummary {
tail: LogPosition,
config: ClusterConfig,
}
struct InstallSnapshot<IO: Io> {
future: IO::SaveLog,
summary: SnapshotSummary,
}
impl<IO: Io> InstallSnapshot<IO> {
pub fn new(common: &mut Common<IO>, prefix: LogPrefix) -> Self {
let summary = SnapshotSummary {
tail: prefix.tail,
config: prefix.config.clone(),
};
let future = common.io.save_log_prefix(prefix);
InstallSnapshot { future, summary }
}
}
impl<IO: Io> Future for InstallSnapshot<IO> {
type Item = SnapshotSummary;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(track!(self.future.poll())?.map(|()| self.summary.clone()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometrics::metrics::MetricBuilder;
use trackable::result::TestResult;
use log::{LogEntry, LogPrefix};
use metrics::NodeStateMetrics;
use test_util::tests::TestIoBuilder;
#[test]
fn is_snapshot_installing_works() -> TestResult {
let node_id: NodeId = "node1".into();
let metrics = track!(NodeStateMetrics::new(&MetricBuilder::new()))?;
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id, io, cluster.clone(), metrics);
let prefix = LogPrefix {
tail: LogPosition::default(),
config: cluster,
snapshot: Vec::default(),
};
assert!(!common.is_snapshot_installing());
common.install_snapshot(prefix)?;
assert!(common.is_snapshot_installing());
Ok(())
}
#[test]
fn is_focusing_on_installing_snapshot_works() -> TestResult {
let node_id: NodeId = "node1".into();
let metrics = track!(NodeStateMetrics::new(&MetricBuilder::new()))?;
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id, io, cluster.clone(), metrics);
let prev_term = Term::new(0);
let node_prefix = LogPrefix {
tail: LogPosition {
prev_term,
index: LogIndex::new(3),
},
config: cluster.clone(),
snapshot: vec![0],
};
let log_suffix = LogSuffix {
head: LogPosition {
prev_term,
index: LogIndex::new(3),
},
entries: vec![
LogEntry::Command {
term: prev_term,
command: Vec::default(),
},
LogEntry::Command {
term: prev_term,
command: Vec::default(),
},
LogEntry::Command {
term: prev_term,
command: Vec::default(),
},
],
};
let leader_prefix = LogPrefix {
tail: LogPosition {
prev_term,
index: LogIndex::new(5),
},
config: cluster,
snapshot: vec![1],
};
assert!(!common.is_focusing_on_installing_snapshot());
common.handle_log_snapshot_loaded(node_prefix)?;
common.install_snapshot(leader_prefix)?;
assert!(common.is_focusing_on_installing_snapshot());
common.handle_log_appended(&log_suffix)?;
assert_eq!(
common.log().tail(),
LogPosition {
prev_term,
index: LogIndex::new(6)
}
);
assert!(!common.is_focusing_on_installing_snapshot());
Ok(())
}
}