use futures::{Async, Poll, Stream};
use std::time::Instant;
pub use self::common::Common;
use self::candidate::Candidate;
use self::common::HandleMessageResult;
use self::follower::Follower;
use self::leader::Leader;
use self::loader::Loader;
use cluster::ClusterConfig;
use message::Message;
use metrics::NodeStateMetrics;
use node::NodeId;
use {Error, Event, Io, Result};
mod candidate;
mod common;
mod follower;
mod leader;
mod loader;
type NextState<IO> = Option<RoleState<IO>>;
pub struct NodeState<IO: Io> {
pub common: Common<IO>,
pub role: RoleState<IO>,
started_at: Instant,
pub metrics: NodeStateMetrics,
}
impl<IO: Io> NodeState<IO> {
pub fn load(node_id: NodeId, config: ClusterConfig, io: IO, metrics: NodeStateMetrics) -> Self {
let mut common = Common::new(node_id, io, config, metrics.clone());
let role = RoleState::Loader(Loader::new(&mut common));
let started_at = Instant::now();
NodeState {
common,
role,
started_at,
metrics,
}
}
pub fn is_loading(&self) -> bool {
self.role.is_loader()
}
pub fn start_election(&mut self) {
if let RoleState::Follower(_) = self.role {
let next = self.common.transit_to_candidate();
self.handle_role_change(next);
}
}
fn handle_timeout(&mut self) -> Result<Option<RoleState<IO>>> {
match self.role {
RoleState::Loader(ref mut t) => track!(t.handle_timeout(&mut self.common)),
RoleState::Follower(ref mut t) => track!(t.handle_timeout(&mut self.common)),
RoleState::Candidate(ref mut t) => track!(t.handle_timeout(&mut self.common)),
RoleState::Leader(ref mut t) => track!(t.handle_timeout(&mut self.common)),
}
}
fn handle_message(&mut self, message: Message) -> Result<Option<RoleState<IO>>> {
if let RoleState::Loader(_) = self.role {
return Ok(None);
}
match self.common.handle_message(message) {
HandleMessageResult::Handled(next) => Ok(next),
HandleMessageResult::Unhandled(message) => match self.role {
RoleState::Loader(_) => unreachable!(),
RoleState::Follower(ref mut t) => {
track!(t.handle_message(&mut self.common, message))
}
RoleState::Candidate(ref mut t) => {
track!(t.handle_message(&mut self.common, &message))
}
RoleState::Leader(ref mut t) => track!(t.handle_message(&mut self.common, message)),
},
}
}
fn handle_role_change(&mut self, next: RoleState<IO>) {
match (&self.role, &next) {
(RoleState::Candidate(_), RoleState::Leader(_)) => {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics
.candidate_to_leader_duration_seconds
.observe(elapsed);
self.started_at = Instant::now();
}
(RoleState::Candidate(_), RoleState::Follower(_)) => {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics
.candidate_to_follower_duration_seconds
.observe(elapsed);
self.started_at = Instant::now();
}
(RoleState::Loader(_), RoleState::Candidate(_)) => {
let elapsed = prometrics::timestamp::duration_to_seconds(self.started_at.elapsed());
self.metrics
.loader_to_candidate_duration_seconds
.observe(elapsed);
self.started_at = Instant::now();
}
(RoleState::Leader(_), RoleState::Leader(_))
| (RoleState::Follower(_), RoleState::Follower(_))
| (RoleState::Candidate(_), RoleState::Candidate(_))
| (RoleState::Loader(_), RoleState::Loader(_)) => {}
_ => self.started_at = Instant::now(),
}
self.role = next;
}
}
impl<IO: Io> Stream for NodeState<IO> {
type Item = Event;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut did_something = true;
while did_something {
did_something = false;
if let Some(e) = self.common.next_event() {
return Ok(Async::Ready(Some(e)));
}
if let Async::Ready(()) = track!(self.common.poll_timeout())? {
did_something = true;
self.metrics.poll_timeout_total.increment();
if let Some(next) = track!(self.handle_timeout())? {
self.handle_role_change(next);
}
if let Some(e) = self.common.next_event() {
return Ok(Async::Ready(Some(e)));
}
}
if let Some(next) = track!(self.common.run_once())? {
did_something = true;
self.handle_role_change(next);
}
if let Some(e) = self.common.next_event() {
return Ok(Async::Ready(Some(e)));
}
let result = match self.role {
RoleState::Loader(ref mut t) => track!(t.run_once(&mut self.common))?,
RoleState::Follower(ref mut t) => track!(t.run_once(&mut self.common))?,
RoleState::Candidate(ref mut t) => track!(t.run_once(&mut self.common))?,
RoleState::Leader(ref mut t) => track!(t.run_once(&mut self.common))?,
};
if let Some(next) = result {
did_something = true;
self.handle_role_change(next);
}
if let Some(e) = self.common.next_event() {
return Ok(Async::Ready(Some(e)));
}
if let Some(message) = track!(self.common.try_recv_message())? {
did_something = true;
if let Some(next) = track!(self.handle_message(message))? {
self.handle_role_change(next);
}
if let Some(e) = self.common.next_event() {
return Ok(Async::Ready(Some(e)));
}
}
}
Ok(Async::NotReady)
}
}
pub enum RoleState<IO: Io> {
Loader(Loader<IO>),
Follower(Follower<IO>),
Candidate(Candidate<IO>),
Leader(Leader<IO>),
}
impl<IO: Io> RoleState<IO> {
pub fn is_loader(&self) -> bool {
if let RoleState::Loader(_) = self {
true
} else {
false
}
}
#[cfg(test)]
pub fn is_candidate(&self) -> bool {
if let RoleState::Candidate(_) = self {
true
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometrics::metrics::MetricBuilder;
use test_util::tests::TestIoBuilder;
#[test]
fn node_state_is_loading_works() {
let metrics = NodeStateMetrics::new(&MetricBuilder::new()).expect("Never fails");
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let node = NodeState::load("test".into(), cluster, io, metrics);
assert!(node.is_loading());
}
#[test]
fn role_state_is_loader_works() {
let metrics = NodeStateMetrics::new(&MetricBuilder::new()).expect("Never fails");
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster, metrics);
let state = RoleState::Loader(Loader::new(&mut common));
assert!(state.is_loader());
assert!(!state.is_candidate());
}
#[test]
fn role_state_is_candidate_works() {
let metrics = NodeStateMetrics::new(&MetricBuilder::new()).expect("Never fails");
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster, metrics);
let state = RoleState::Candidate(Candidate::new(&mut common));
assert!(!state.is_loader());
assert!(state.is_candidate());
}
}