use std::sync::OnceLock;
use std::time::Duration;
pub struct RaftMetrics;
impl RaftMetrics {
pub fn increment_proposals() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_proposals_total").increment(1);
}
pub fn increment_elections() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_elections_total").increment(1);
}
pub fn increment_commits() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_commits_total").increment(1);
}
pub fn increment_snapshots() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_snapshots_total").increment(1);
}
pub fn increment_append_entries_sent() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_append_entries_sent_total").increment(1);
}
pub fn increment_append_entries_received() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_raft_append_entries_received_total").increment(1);
}
pub fn set_current_term(term: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_current_term").set(term as f64);
}
pub fn set_commit_index(index: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_commit_index").set(index as f64);
}
pub fn set_applied_index(index: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_applied_index").set(index as f64);
}
pub fn set_last_log_index(index: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_last_log_index").set(index as f64);
}
pub fn set_is_leader(is_leader: bool) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_is_leader").set(if is_leader { 1.0 } else { 0.0 });
}
pub fn set_peer_count(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_peer_count").set(count as f64);
}
pub fn set_replication_lag(node_id: u64, lag: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_raft_replication_lag", "node_id" => node_id.to_string())
.set(lag as f64);
}
pub fn record_proposal_latency(duration: Duration) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_raft_proposal_latency_seconds").record(duration.as_secs_f64());
}
pub fn record_append_entries_latency(duration: Duration) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_raft_append_entries_latency_seconds")
.record(duration.as_secs_f64());
}
pub fn record_vote_latency(duration: Duration) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_raft_vote_latency_seconds").record(duration.as_secs_f64());
}
pub fn record_snapshot_duration(duration: Duration) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_raft_snapshot_duration_seconds").record(duration.as_secs_f64());
}
pub fn record_batch_size(size: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_raft_batch_size").record(size as f64);
}
}
pub struct ClusterMetrics;
impl ClusterMetrics {
pub fn set_node_count(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_cluster_node_count").set(count as f64);
}
pub fn set_topic_count(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_cluster_topic_count").set(count as f64);
}
pub fn set_partition_count(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_cluster_partition_count").set(count as f64);
}
pub fn set_under_replicated_partitions(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_cluster_under_replicated_partitions").set(count as f64);
}
pub fn set_offline_partitions(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_cluster_offline_partitions").set(count as f64);
}
pub fn increment_swim_pings() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_cluster_swim_pings_total").increment(1);
}
pub fn increment_swim_failures_detected() {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_cluster_swim_failures_detected_total").increment(1);
}
}
pub struct NetworkMetrics;
impl NetworkMetrics {
pub fn add_bytes_sent(bytes: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_network_bytes_sent_total").increment(bytes);
}
pub fn add_bytes_received(bytes: u64) {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_network_bytes_received_total").increment(bytes);
}
pub fn set_active_connections(count: usize) {
#[cfg(feature = "metrics-prometheus")]
metrics::gauge!("rivven_network_active_connections").set(count as f64);
}
pub fn record_rpc_latency(rpc_type: &str, duration: Duration) {
#[cfg(feature = "metrics-prometheus")]
metrics::histogram!("rivven_network_rpc_latency_seconds", "rpc_type" => rpc_type.to_string())
.record(duration.as_secs_f64());
}
pub fn increment_rpc_errors(rpc_type: &str) {
#[cfg(feature = "metrics-prometheus")]
metrics::counter!("rivven_network_rpc_errors_total", "rpc_type" => rpc_type.to_string())
.increment(1);
}
}
#[cfg(feature = "metrics-prometheus")]
mod prom {
use metrics_exporter_prometheus::PrometheusBuilder;
use std::net::SocketAddr;
pub fn init_prometheus_exporter(
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
PrometheusBuilder::new()
.with_http_listener(addr)
.install()?;
tracing::info!(
"Prometheus metrics exporter listening on http://{}/metrics",
addr
);
Ok(())
}
pub fn init_prometheus_recorder() -> Result<
metrics_exporter_prometheus::PrometheusHandle,
Box<dyn std::error::Error + Send + Sync>,
> {
let builder = PrometheusBuilder::new();
let handle = builder.install_recorder()?;
Ok(handle)
}
}
#[cfg(feature = "metrics-prometheus")]
pub use prom::{init_prometheus_exporter, init_prometheus_recorder};
static METRICS_INITIALIZED: OnceLock<()> = OnceLock::new();
pub fn init_metrics(
prometheus_addr: Option<std::net::SocketAddr>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
METRICS_INITIALIZED.get_or_init(|| {
#[cfg(feature = "metrics-prometheus")]
if let Some(addr) = prometheus_addr {
if let Err(e) = init_prometheus_exporter(addr) {
tracing::error!("Failed to start Prometheus exporter: {}", e);
}
}
tracing::info!(
prometheus = cfg!(feature = "metrics-prometheus"),
"Metrics subsystem initialized"
);
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_raft_metrics_compile() {
RaftMetrics::increment_proposals();
RaftMetrics::increment_elections();
RaftMetrics::set_current_term(1);
RaftMetrics::set_is_leader(true);
RaftMetrics::record_proposal_latency(Duration::from_millis(10));
}
#[test]
fn test_cluster_metrics_compile() {
ClusterMetrics::set_node_count(3);
ClusterMetrics::set_topic_count(10);
ClusterMetrics::set_partition_count(100);
}
#[test]
fn test_network_metrics_compile() {
NetworkMetrics::add_bytes_sent(1024);
NetworkMetrics::add_bytes_received(2048);
NetworkMetrics::set_active_connections(5);
NetworkMetrics::record_rpc_latency("append_entries", Duration::from_micros(500));
}
}