use std::fmt;
use std::sync::Arc;
use display_more::DisplayOptionExt;
use crate::Instant;
use crate::RaftTypeConfig;
use crate::core::ServerState;
use crate::display_ext::DisplayBTreeMapOptValue;
use crate::errors::Fatal;
use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::type_config::alias::InstantOf;
#[cfg(feature = "metrics-logids")]
use crate::type_config::alias::LogIdListOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::SerdeInstantOf;
use crate::type_config::alias::StoredMembershipOf;
use crate::type_config::alias::VoteOf;
use crate::vote::raft_vote::RaftVoteExt;
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftMetrics<C: RaftTypeConfig> {
pub running_state: Result<(), Fatal<C>>,
pub id: C::NodeId,
pub current_term: C::Term,
pub vote: VoteOf<C>,
pub last_log_index: Option<u64>,
pub committed: Option<LogIdOf<C>>,
pub last_applied: Option<LogIdOf<C>>,
pub snapshot: Option<LogIdOf<C>>,
pub purged: Option<LogIdOf<C>>,
#[cfg(feature = "metrics-logids")]
#[cfg_attr(feature = "serde", serde(skip))]
pub log_id_list: LogIdListOf<C>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,
pub last_quorum_acked: Option<SerdeInstantOf<C>>,
pub membership_config: Arc<StoredMembershipOf<C>>,
pub heartbeat: Option<HeartbeatMetrics<C>>,
pub replication: Option<ReplicationMetrics<C>>,
}
impl<C> fmt::Display for RaftMetrics<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Metrics{{")?;
write!(
f,
"id:{}, {:?}, term:{}, vote:{}, last_log:{}, committed:{}, last_applied:{}, leader:{}",
self.id,
self.state,
self.current_term,
self.vote,
self.last_log_index.display(),
self.committed.display(),
self.last_applied.display(),
self.current_leader.display(),
)?;
if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
"(quorum_acked_time:{}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, "(quorum_acked_time:None)")?;
}
write!(f, ", ")?;
write!(
f,
"membership:{}, snapshot:{}, purged:{}, replication:{{{}}}, heartbeat:{{{}}}",
self.membership_config,
self.snapshot.display(),
self.purged.display(),
self.replication.as_ref().map(DisplayBTreeMapOptValue).display(),
self.heartbeat.as_ref().map(DisplayBTreeMapOptValue).display(),
)?;
write!(f, "}}")?;
Ok(())
}
}
impl<C> RaftMetrics<C>
where C: RaftTypeConfig
{
pub fn new_initial(id: C::NodeId) -> Self {
let vote = VoteOf::<C>::new_with_default_term(id.clone());
#[allow(deprecated)]
Self {
running_state: Ok(()),
id,
current_term: Default::default(),
vote,
last_log_index: None,
committed: None,
last_applied: None,
snapshot: None,
purged: None,
#[cfg(feature = "metrics-logids")]
log_id_list: Default::default(),
state: ServerState::Follower,
current_leader: None,
millis_since_quorum_ack: None,
last_quorum_acked: None,
membership_config: Arc::new(StoredMembershipOf::<C>::default()),
replication: None,
heartbeat: None,
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftDataMetrics<C: RaftTypeConfig> {
pub last_log: Option<LogIdOf<C>>,
pub committed: Option<LogIdOf<C>>,
pub last_applied: Option<LogIdOf<C>>,
pub snapshot: Option<LogIdOf<C>>,
pub purged: Option<LogIdOf<C>>,
#[cfg(feature = "metrics-logids")]
#[cfg_attr(feature = "serde", serde(skip))]
pub log_id_list: LogIdListOf<C>,
#[deprecated(since = "0.10.0", note = "use `last_quorum_acked` instead.")]
pub millis_since_quorum_ack: Option<u64>,
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,
pub replication: Option<ReplicationMetrics<C>>,
pub heartbeat: Option<HeartbeatMetrics<C>>,
}
impl<C> fmt::Display for RaftDataMetrics<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataMetrics{{")?;
write!(
f,
"last_log:{}, committed:{}, last_applied:{}, snapshot:{}, purged:{}",
self.last_log.display(),
self.committed.display(),
self.last_applied.display(),
self.snapshot.display(),
self.purged.display(),
)?;
if let Some(quorum_acked) = &self.last_quorum_acked {
write!(
f,
", quorum_acked_time:({}, {:?} ago)",
quorum_acked,
quorum_acked.elapsed()
)?;
} else {
write!(f, ", quorum_acked_time:None")?;
}
write!(
f,
", replication:{{{}}}, heartbeat:{{{}}}",
self.replication.as_ref().map(DisplayBTreeMapOptValue).display(),
self.heartbeat.as_ref().map(DisplayBTreeMapOptValue).display(),
)?;
write!(f, "}}")?;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub id: C::NodeId,
pub vote: VoteOf<C>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,
pub membership_config: Arc<StoredMembershipOf<C>>,
}
impl<C> fmt::Display for RaftServerMetrics<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ServerMetrics{{")?;
write!(
f,
"id:{}, {:?}, vote:{}, leader:{}, membership:{}",
self.id,
self.state,
self.vote,
self.current_leader.display(),
self.membership_config,
)?;
write!(f, "}}")?;
Ok(())
}
}
impl<C> RaftServerMetrics<C>
where C: RaftTypeConfig
{
pub(crate) fn new_initial(id: C::NodeId) -> Self {
let vote = VoteOf::<C>::new_with_default_term(id.clone());
Self {
id,
vote,
state: Default::default(),
current_leader: None,
membership_config: Arc::new(Default::default()),
}
}
}
#[cfg(test)]
#[cfg(feature = "metrics-logids")]
mod tests {
use crate::engine::log_id_list::LogIdList;
use crate::engine::testing::UTConfig;
use crate::engine::testing::log_id;
use crate::metrics::RaftDataMetrics;
use crate::metrics::RaftMetrics;
use crate::type_config::alias::LogIdListOf;
#[test]
fn test_raft_metrics_new_initial_has_empty_log_id_list() {
let m = RaftMetrics::<UTConfig>::new_initial(0);
assert_eq!(m.log_id_list, LogIdListOf::<UTConfig>::default());
}
#[test]
fn test_raft_data_metrics_default_has_empty_log_id_list() {
let m = RaftDataMetrics::<UTConfig>::default();
assert_eq!(m.log_id_list, LogIdListOf::<UTConfig>::default());
}
#[test]
fn test_raft_metrics_log_id_list_equality() {
let list = LogIdList::new(None, vec![log_id(1, 0, 3), log_id(2, 0, 5)]);
let mut m1 = RaftMetrics::<UTConfig>::new_initial(0);
m1.log_id_list = list.clone();
let mut m2 = RaftMetrics::<UTConfig>::new_initial(0);
m2.log_id_list = list;
assert_eq!(m1, m2);
}
#[test]
fn test_raft_metrics_log_id_list_inequality() {
let mut m1 = RaftMetrics::<UTConfig>::new_initial(0);
m1.log_id_list = LogIdList::new(None, vec![log_id(1, 0, 3)]);
let mut m2 = RaftMetrics::<UTConfig>::new_initial(0);
m2.log_id_list = LogIdList::new(None, vec![log_id(2, 0, 5)]);
assert_ne!(m1, m2);
}
#[test]
#[allow(deprecated)]
fn test_raft_data_metrics_log_id_list_equality() {
let list = LogIdList::new(None, vec![log_id(1, 0, 3), log_id(2, 0, 5)]);
let m1 = RaftDataMetrics::<UTConfig> {
log_id_list: list.clone(),
..Default::default()
};
let m2 = RaftDataMetrics::<UTConfig> {
log_id_list: list,
..Default::default()
};
assert_eq!(m1, m2);
}
}