use std::sync::atomic::{AtomicBool, Ordering};
use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
static METRICS_ENABLED: AtomicBool = AtomicBool::new(false);
pub fn set_enabled(enabled: bool) {
METRICS_ENABLED.store(enabled, Ordering::Relaxed);
}
#[inline(always)]
fn is_enabled() -> bool {
METRICS_ENABLED.load(Ordering::Relaxed)
}
pub fn init_metrics() -> PrometheusHandle {
set_enabled(true);
PrometheusBuilder::new()
.install_recorder()
.unwrap_or_else(|_| {
PrometheusBuilder::new().build_recorder().handle()
})
}
pub fn record_message(channel: &str, status: &'static str) {
if !is_enabled() {
return;
}
counter!("messages_total", "channel" => channel.to_owned(), "status" => status).increment(1);
}
pub fn record_error(error_type: &'static str) {
if !is_enabled() {
return;
}
counter!("errors_total", "type" => error_type).increment(1);
}
pub fn record_message_duration(channel: &str, duration_secs: f64) {
if !is_enabled() {
return;
}
histogram!("message_duration_seconds", "channel" => channel.to_owned()).record(duration_secs);
}
pub fn record_circuit_breaker_trip(connector: &str, channel: &str) {
if !is_enabled() {
return;
}
counter!(
"circuit_breaker_trips_total",
"connector" => connector.to_owned(),
"channel" => channel.to_owned()
)
.increment(1);
}
pub fn record_circuit_breaker_rejection(connector: &str, channel: &str) {
if !is_enabled() {
return;
}
counter!(
"circuit_breaker_rejections_total",
"connector" => connector.to_owned(),
"channel" => channel.to_owned()
)
.increment(1);
}
pub fn set_active_workflows(count: f64) {
if !is_enabled() {
return;
}
gauge!("active_workflows").set(count);
}
pub fn record_http_request(method: String, path: String, status: u16, duration_secs: f64) {
if !is_enabled() {
return;
}
let status = status.to_string();
counter!(
"http_requests_total",
"method" => method.clone(),
"path" => path.clone(),
"status" => status.clone()
)
.increment(1);
histogram!(
"http_request_duration_seconds",
"method" => method,
"path" => path,
"status" => status
)
.record(duration_secs);
}
pub fn record_db_query_duration(operation: &'static str, duration_secs: f64) {
if !is_enabled() {
return;
}
histogram!("db_query_duration_seconds", "operation" => operation).record(duration_secs);
}
pub async fn timed_db_op<F, T>(operation: &'static str, f: F) -> T
where
F: std::future::Future<Output = T>,
{
let start = std::time::Instant::now();
let result = f.await;
record_db_query_duration(operation, start.elapsed().as_secs_f64());
result
}
pub fn record_engine_lock_wait(mode: &'static str, duration_secs: f64) {
if !is_enabled() {
return;
}
histogram!("engine_lock_wait_seconds", "mode" => mode).record(duration_secs);
}
pub fn record_engine_reload_duration(duration_secs: f64) {
if !is_enabled() {
return;
}
histogram!("engine_reload_duration_seconds").record(duration_secs);
}
pub fn record_engine_reload(status: &'static str) {
if !is_enabled() {
return;
}
counter!("engine_reloads_total", "status" => status).increment(1);
}
pub fn record_channel_execution(channel: &str) {
if !is_enabled() {
return;
}
counter!("channel_executions_total", "channel" => channel.to_owned()).increment(1);
}
pub fn record_rate_limit_rejected(client: &str) {
if !is_enabled() {
return;
}
counter!("rate_limit_rejections_total", "client" => client.to_owned()).increment(1);
}
pub fn record_cache_hit(channel: &str) {
if !is_enabled() {
return;
}
counter!("response_cache_hits_total", "channel" => channel.to_owned()).increment(1);
}
pub fn record_cache_miss(channel: &str) {
if !is_enabled() {
return;
}
counter!("response_cache_misses_total", "channel" => channel.to_owned()).increment(1);
}
pub fn set_trace_queue_depth(depth: f64) {
if !is_enabled() {
return;
}
gauge!("trace_queue_depth").set(depth);
}
pub fn set_trace_workers_active(count: f64) {
if !is_enabled() {
return;
}
gauge!("trace_workers_active").set(count);
}
pub fn set_trace_workers_total(count: f64) {
if !is_enabled() {
return;
}
gauge!("trace_workers_total").set(count);
}
pub fn set_trace_queue_memory_bytes(bytes: f64) {
if !is_enabled() {
return;
}
gauge!("trace_queue_memory_bytes").set(bytes);
}
pub fn record_trace_dropped(reason: &'static str) {
if !is_enabled() {
return;
}
counter!("trace_dropped_total", "reason" => reason).increment(1);
}
pub fn set_trace_persistence_queue_depth(depth: f64) {
if !is_enabled() {
return;
}
gauge!("trace_persistence_queue_depth").set(depth);
}
pub fn record_trace_persistence_batch_size(size: usize) {
if !is_enabled() {
return;
}
histogram!("trace_persistence_batch_size").record(size as f64);
}
pub fn record_connector_request(connector: &str, channel: &str, status: &'static str) {
if !is_enabled() {
return;
}
counter!(
"connector_requests_total",
"connector" => connector.to_owned(),
"channel" => channel.to_owned(),
"status" => status
)
.increment(1);
}
pub fn record_connector_duration(connector: &str, channel: &str, duration_secs: f64) {
if !is_enabled() {
return;
}
histogram!(
"connector_request_duration_seconds",
"connector" => connector.to_owned(),
"channel" => channel.to_owned()
)
.record(duration_secs);
}
pub fn set_kafka_consumer_lag(topic: &str, partition: i32, lag: f64) {
if !is_enabled() {
return;
}
gauge!(
"kafka_consumer_lag",
"topic" => topic.to_owned(),
"partition" => partition.to_string()
)
.set(lag);
}
pub fn set_db_pool_size(size: f64) {
if !is_enabled() {
return;
}
gauge!("db_pool_size").set(size);
}
pub fn set_db_pool_idle(idle: f64) {
if !is_enabled() {
return;
}
gauge!("db_pool_idle").set(idle);
}
pub fn record_admin_audit(action: &str, resource_type: &str) {
if !is_enabled() {
return;
}
counter!(
"admin_audit_events_total",
"action" => action.to_owned(),
"resource_type" => resource_type.to_owned()
)
.increment(1);
}
#[cfg(test)]
mod tests {
use super::*;
fn ensure_recorder() {
let _ = PrometheusBuilder::new().install_recorder();
set_enabled(true);
}
#[test]
fn test_record_message() {
ensure_recorder();
record_message("test-channel", "ok");
record_message("test-channel", "error");
}
#[test]
fn test_record_error() {
ensure_recorder();
record_error("engine");
record_error("storage");
}
#[test]
fn test_record_message_duration() {
ensure_recorder();
record_message_duration("orders", 0.123);
}
#[test]
fn test_record_circuit_breaker_trip() {
ensure_recorder();
record_circuit_breaker_trip("my-connector", "orders");
}
#[test]
fn test_record_circuit_breaker_rejection() {
ensure_recorder();
record_circuit_breaker_rejection("my-connector", "orders");
}
#[test]
fn test_set_active_workflows() {
ensure_recorder();
set_active_workflows(5.0);
set_active_workflows(0.0);
}
#[test]
fn test_record_http_request() {
ensure_recorder();
record_http_request("GET".into(), "/health".into(), 200, 0.005);
record_http_request("POST".into(), "/api/v1/data/orders".into(), 201, 0.010);
}
#[test]
fn test_record_db_query_duration() {
ensure_recorder();
record_db_query_duration("list_rules", 0.010);
}
#[tokio::test]
async fn test_timed_db_op() {
ensure_recorder();
let result = timed_db_op("test_op", async { 42 }).await;
assert_eq!(result, 42);
}
#[test]
fn test_record_engine_lock_wait() {
ensure_recorder();
record_engine_lock_wait("read", 0.001);
record_engine_lock_wait("write", 0.050);
}
#[test]
fn test_record_engine_reload_duration() {
ensure_recorder();
record_engine_reload_duration(0.250);
}
#[test]
fn test_record_engine_reload() {
ensure_recorder();
record_engine_reload("success");
record_engine_reload("failure");
}
#[test]
fn test_record_channel_execution() {
ensure_recorder();
record_channel_execution("orders");
}
#[test]
fn test_record_rate_limit_rejected() {
ensure_recorder();
record_rate_limit_rejected("192.168.1.1");
}
#[test]
fn test_init_metrics() {
let handle = init_metrics();
let output = handle.render();
assert!(output.is_ascii());
}
}