use futures::{Poll, Stream};
use prometrics::metrics::MetricBuilder;
use std::sync::Arc;
use trackable::error::ErrorKindExt;
use cluster::{ClusterConfig, ClusterMembers};
use election::{Ballot, Role};
use io::Io;
use log::{LogEntry, LogHistory, LogIndex, LogPosition, LogPrefix, ProposalId};
use message::SequenceNumber;
use metrics::RaftlogMetrics;
use node::{Node, NodeId};
use node_state::{NodeState, RoleState};
use {Error, ErrorKind, Result};
pub struct ReplicatedLog<IO: Io> {
node: NodeState<IO>,
metrics: Arc<RaftlogMetrics>,
}
impl<IO: Io> ReplicatedLog<IO> {
#[allow(clippy::new_ret_no_self)]
pub fn new(
node_id: NodeId,
members: ClusterMembers,
io: IO,
metric_builder: &MetricBuilder,
) -> Result<Self> {
let config = ClusterConfig::new(members);
let mut metric_builder = metric_builder.clone();
metric_builder.namespace("raftlog");
let metrics = track!(RaftlogMetrics::new(&metric_builder))?;
let node = NodeState::load(node_id, config, io, metrics.node_state.clone());
Ok(ReplicatedLog {
node,
metrics: Arc::new(metrics),
})
}
pub fn metrics(&self) -> &Arc<RaftlogMetrics> {
&self.metrics
}
pub fn propose_command(&mut self, command: Vec<u8>) -> Result<ProposalId> {
if let RoleState::Leader(ref mut leader) = self.node.role {
let term = self.node.common.term();
let entry = LogEntry::Command { term, command };
let proposal_id = leader.propose(&mut self.node.common, entry);
Ok(proposal_id)
} else {
track_panic!(ErrorKind::NotLeader)
}
}
pub fn propose_config(&mut self, new_members: ClusterMembers) -> Result<ProposalId> {
if let RoleState::Leader(ref mut leader) = self.node.role {
let config = self.node.common.config().start_config_change(new_members);
let term = self.node.common.term();
let entry = LogEntry::Config { term, config };
let proposal_id = leader.propose(&mut self.node.common, entry);
Ok(proposal_id)
} else {
track_panic!(ErrorKind::NotLeader)
}
}
pub fn heartbeat(&mut self) -> Result<SequenceNumber> {
if let RoleState::Leader(ref mut leader) = self.node.role {
let seq_no = leader.heartbeat_syn(&mut self.node.common);
Ok(seq_no)
} else {
track_panic!(ErrorKind::NotLeader);
}
}
pub fn install_snapshot(&mut self, new_head: LogIndex, snapshot: Vec<u8>) -> Result<()> {
track_assert!(
!self.node.is_loading(),
ErrorKind::Busy,
"Loading node state"
);
let (prev_term, config) = {
let record = track!(
self.node
.common
.log()
.get_record(new_head)
.ok_or_else(|| ErrorKind::InvalidInput.error()),
"Too old log position: new_head={:?}, current_head={:?}, node={:?}",
new_head,
self.local_history().head(),
self.local_node()
)?;
(record.head.prev_term, record.config.clone())
};
let prefix = LogPrefix {
tail: LogPosition {
prev_term,
index: new_head,
},
config,
snapshot,
};
track!(self.node.common.install_snapshot(prefix))?;
Ok(())
}
pub fn start_election(&mut self) {
self.node.start_election();
}
pub fn local_node(&self) -> &Node {
self.node.common.local_node()
}
pub fn local_history(&self) -> &LogHistory {
self.node.common.log()
}
pub fn proposal_queue_len(&self) -> usize {
if let RoleState::Leader(ref leader) = self.node.role {
leader.proposal_queue_len(&self.node.common)
} else {
0
}
}
pub fn is_snapshot_installing(&self) -> bool {
self.node.is_loading() || self.node.common.is_snapshot_installing()
}
pub fn last_heartbeat_ack(&self) -> SequenceNumber {
if let RoleState::Leader(ref leader) = self.node.role {
leader.last_heartbeat_ack()
} else {
SequenceNumber::new(0)
}
}
pub fn cluster_config(&self) -> &ClusterConfig {
self.node.common.config()
}
pub fn io(&self) -> &IO {
self.node.common.io()
}
pub unsafe fn io_mut(&mut self) -> &mut IO {
self.node.common.io_mut()
}
}
impl<IO: Io> Stream for ReplicatedLog<IO> {
type Item = Event;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
track!(self.node.poll(), "node={:?}", self.local_node())
}
}
#[derive(Debug, PartialEq, Eq)]
#[allow(missing_docs)]
pub enum Event {
RoleChanged { new_role: Role },
TermChanged { new_ballot: Ballot },
NewLeaderElected,
Committed { index: LogIndex, entry: LogEntry },
SnapshotLoaded {
new_head: LogPosition,
snapshot: Vec<u8>,
},
SnapshotInstalled { new_head: LogPosition },
}