use std::sync::OnceLock;
use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Opts, TextEncoder};
static METRICS: OnceLock<MetricsInner> = OnceLock::new();
struct MetricsInner {
requests_total: IntCounterVec,
events_emitted_total: IntCounterVec,
errors_total: IntCounterVec,
output_errors_total: IntCounterVec,
events_dropped_total: IntCounterVec,
pending_events: IntGaugeVec,
request_duration_seconds: prometheus::HistogramVec,
circuit_breaker_state: IntGaugeVec,
}
pub fn init() -> Result<(), prometheus::Error> {
let requests_total = IntCounterVec::new(
Opts::new(
"helr_requests_total",
"Total HTTP requests by source and status",
),
&["source", "status"],
)?;
let events_emitted_total = IntCounterVec::new(
Opts::new(
"helr_events_emitted_total",
"Total events emitted to stdout by source",
),
&["source"],
)?;
let errors_total = IntCounterVec::new(
Opts::new("hel_errors_total", "Total errors by source"),
&["source"],
)?;
let output_errors_total = IntCounterVec::new(
Opts::new(
"helr_output_errors_total",
"Output write errors (e.g. broken pipe, disk full)",
),
&["source"],
)?;
let request_duration_seconds = prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"helr_request_duration_seconds",
"HTTP request duration in seconds by source",
)
.buckets(prometheus::exponential_buckets(0.05, 2.0, 10).unwrap()),
&["source"],
)?;
let circuit_breaker_state = IntGaugeVec::new(
Opts::new(
"helr_circuit_breaker_state",
"Circuit breaker state: 0=closed, 1=open, 2=half_open",
),
&["source"],
)?;
let events_dropped_total = IntCounterVec::new(
Opts::new(
"helr_events_dropped_total",
"Events dropped (e.g. due to backpressure) by source and reason",
),
&["source", "reason"],
)?;
let pending_events = IntGaugeVec::new(
Opts::new(
"helr_pending_events",
"Events currently queued for output by source",
),
&["source"],
)?;
prometheus::register(Box::new(requests_total.clone()))?;
prometheus::register(Box::new(events_emitted_total.clone()))?;
prometheus::register(Box::new(errors_total.clone()))?;
prometheus::register(Box::new(output_errors_total.clone()))?;
prometheus::register(Box::new(events_dropped_total.clone()))?;
prometheus::register(Box::new(pending_events.clone()))?;
prometheus::register(Box::new(request_duration_seconds.clone()))?;
prometheus::register(Box::new(circuit_breaker_state.clone()))?;
let _ = METRICS.set(MetricsInner {
requests_total,
events_emitted_total,
errors_total,
output_errors_total,
events_dropped_total,
pending_events,
request_duration_seconds,
circuit_breaker_state,
});
Ok(())
}
pub fn record_request(source: &str, status_class: &str, duration_secs: f64) {
if let Some(m) = METRICS.get() {
m.requests_total
.with_label_values(&[source, status_class])
.inc();
m.request_duration_seconds
.with_label_values(&[source])
.observe(duration_secs);
}
}
pub fn record_events(source: &str, count: u64) {
if let Some(m) = METRICS.get() {
m.events_emitted_total
.with_label_values(&[source])
.inc_by(count);
}
}
pub fn record_error(source: &str) {
if let Some(m) = METRICS.get() {
m.errors_total.with_label_values(&[source]).inc();
}
}
pub fn record_output_error(source: &str) {
if let Some(m) = METRICS.get() {
m.output_errors_total.with_label_values(&[source]).inc();
}
}
pub fn record_event_dropped(source: &str, reason: &str) {
if let Some(m) = METRICS.get() {
m.events_dropped_total
.with_label_values(&[source, reason])
.inc();
}
}
pub fn set_pending_events(source: &str, count: i64) {
if let Some(m) = METRICS.get() {
m.pending_events
.with_label_values(&[source])
.set(std::cmp::max(0, count));
}
}
pub fn set_circuit_state(source: &str, state: CircuitStateValue) {
if let Some(m) = METRICS.get() {
let v = match state {
CircuitStateValue::Closed => 0,
CircuitStateValue::Open => 1,
CircuitStateValue::HalfOpen => 2,
};
m.circuit_breaker_state.with_label_values(&[source]).set(v);
}
}
#[derive(Clone, Copy)]
pub enum CircuitStateValue {
Closed,
Open,
HalfOpen,
}
pub fn encode() -> String {
if METRICS.get().is_none() {
return String::new();
}
let families = prometheus::gather();
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
if encoder.encode(&families, &mut buffer).is_ok() {
String::from_utf8_lossy(&buffer).into_owned()
} else {
String::new()
}
}