#[cfg(feature = "metrics")]
use std::sync::OnceLock;
use std::time::{Duration, Instant};
pub use metrics::{counter, gauge, histogram};
#[cfg(feature = "metrics")]
static METRICS_INITIALIZED: OnceLock<()> = OnceLock::new();
#[cfg(feature = "metrics")]
pub fn init_metrics(
addr: std::net::SocketAddr,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
METRICS_INITIALIZED.get_or_init(
|| match metrics_exporter_prometheus::PrometheusBuilder::new()
.with_http_listener(addr)
.install()
{
Ok(()) => {
tracing::info!(
"Prometheus metrics server listening on http://{}/metrics",
addr
);
}
Err(e) => {
tracing::error!("Failed to start Prometheus exporter: {}", e);
}
},
);
Ok(())
}
#[cfg(feature = "metrics")]
pub fn init_metrics_recorder(
) -> Result<metrics_exporter_prometheus::PrometheusHandle, Box<dyn std::error::Error + Send + Sync>>
{
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
let handle = builder.install_recorder()?;
Ok(handle)
}
#[cfg(not(feature = "metrics"))]
pub fn init_metrics(
_addr: std::net::SocketAddr,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
pub struct CoreMetrics;
impl CoreMetrics {
pub fn increment_messages_appended() {
metrics::counter!("rivven_core_messages_appended_total").increment(1);
}
pub fn add_messages_appended(count: u64) {
metrics::counter!("rivven_core_messages_appended_total").increment(count);
}
pub fn add_messages_read(count: u64) {
metrics::counter!("rivven_core_messages_read_total").increment(count);
}
pub fn increment_batch_appends() {
metrics::counter!("rivven_core_batch_appends_total").increment(1);
}
pub fn increment_schemas_registered() {
metrics::counter!("rivven_core_schemas_registered_total").increment(1);
}
pub fn increment_schema_cache_hits() {
metrics::counter!("rivven_core_schema_cache_hits_total").increment(1);
}
pub fn increment_schema_cache_misses() {
metrics::counter!("rivven_core_schema_cache_misses_total").increment(1);
}
pub fn increment_auth_failures(auth_type: &str) {
metrics::counter!("rivven_core_auth_failures_total", "type" => auth_type.to_string())
.increment(1);
}
pub fn increment_rate_limit_rejections() {
metrics::counter!("rivven_core_rate_limit_rejections_total").increment(1);
}
pub fn increment_circuit_breaker_trips() {
metrics::counter!("rivven_core_circuit_breaker_trips_total").increment(1);
}
pub fn increment_sql_injection_blocked() {
metrics::counter!("rivven_core_sql_injection_blocked_total").increment(1);
}
pub fn increment_connection_timeouts() {
metrics::counter!("rivven_core_connection_timeouts_total").increment(1);
}
pub fn increment_message_size_exceeded() {
metrics::counter!("rivven_core_message_size_exceeded_total").increment(1);
}
pub fn set_active_connections(count: u64) {
metrics::gauge!("rivven_core_active_connections").set(count as f64);
}
pub fn set_partition_count(count: u64) {
metrics::gauge!("rivven_core_partition_count").set(count as f64);
}
pub fn set_segment_count(count: u64) {
metrics::gauge!("rivven_core_segment_count").set(count as f64);
}
pub fn set_partition_offset(topic: &str, partition: u32, offset: u64) {
metrics::gauge!(
"rivven_core_partition_offset",
"topic" => topic.to_string(),
"partition" => partition.to_string()
)
.set(offset as f64);
}
pub fn set_schema_cache_size(size: u64) {
metrics::gauge!("rivven_core_schema_cache_size").set(size as f64);
}
pub fn record_append_latency_us(us: u64) {
metrics::histogram!("rivven_core_append_latency_seconds").record(us as f64 / 1_000_000.0);
}
pub fn record_batch_append_latency_us(us: u64) {
metrics::histogram!("rivven_core_batch_append_latency_seconds")
.record(us as f64 / 1_000_000.0);
}
pub fn record_read_latency_us(us: u64) {
metrics::histogram!("rivven_core_read_latency_seconds").record(us as f64 / 1_000_000.0);
}
pub fn record_schema_lookup_latency_ns(ns: u64) {
metrics::histogram!("rivven_core_schema_lookup_latency_seconds")
.record(ns as f64 / 1_000_000_000.0);
}
}
pub struct Timer {
start: Instant,
}
impl Timer {
pub fn new() -> Self {
Self {
start: Instant::now(),
}
}
pub fn elapsed_us(&self) -> u64 {
self.start.elapsed().as_micros() as u64
}
pub fn elapsed_ns(&self) -> u64 {
self.start.elapsed().as_nanos() as u64
}
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
}
impl Default for Timer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_core_metrics_compile() {
CoreMetrics::increment_messages_appended();
CoreMetrics::add_messages_appended(100);
CoreMetrics::add_messages_read(50);
CoreMetrics::increment_batch_appends();
CoreMetrics::increment_schemas_registered();
CoreMetrics::increment_schema_cache_hits();
CoreMetrics::increment_schema_cache_misses();
CoreMetrics::increment_auth_failures("md5");
CoreMetrics::increment_rate_limit_rejections();
CoreMetrics::increment_circuit_breaker_trips();
CoreMetrics::increment_sql_injection_blocked();
CoreMetrics::increment_connection_timeouts();
CoreMetrics::increment_message_size_exceeded();
CoreMetrics::set_active_connections(10);
CoreMetrics::set_partition_count(100);
CoreMetrics::set_segment_count(1000);
CoreMetrics::set_partition_offset("orders", 0, 12345);
CoreMetrics::set_schema_cache_size(50);
CoreMetrics::record_append_latency_us(100);
CoreMetrics::record_batch_append_latency_us(500);
CoreMetrics::record_read_latency_us(200);
CoreMetrics::record_schema_lookup_latency_ns(30);
}
#[test]
fn test_timer() {
let timer = Timer::new();
std::thread::sleep(std::time::Duration::from_millis(1));
assert!(timer.elapsed_us() >= 1000);
assert!(timer.elapsed_ns() >= 1_000_000);
}
}