use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Mutex, OnceLock};
use std::time::Instant;
use crate::application::Application;
use crate::middleware::Middleware;
use crate::request::Request;
use crate::response::Response;
use crate::server::ConnectionInfo;
pub static SERVER_READY: AtomicBool = AtomicBool::new(false);
pub static REQUESTS_TOTAL: AtomicU64 = AtomicU64::new(0);
pub static ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
pub static ACTIVE_CONNECTIONS: AtomicI64 = AtomicI64::new(0);
pub fn record_request() {
REQUESTS_TOTAL.fetch_add(1, Ordering::Relaxed);
}
pub fn record_error() {
ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
}
pub fn connection_open() {
ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
}
pub fn connection_close() {
ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
}
const BUCKET_BOUNDS: [f64; 11] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
struct HistogramEntry {
buckets: [u64; 11],
sum: f64,
count: u64,
}
impl HistogramEntry {
fn new() -> Self {
HistogramEntry { buckets: [0; 11], sum: 0.0, count: 0 }
}
fn observe(&mut self, secs: f64) {
for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
if secs <= upper {
self.buckets[i] += 1;
}
}
self.sum += secs;
self.count += 1;
}
}
struct RouteEntry {
counts: HashMap<i16, u64>,
latency: HistogramEntry,
}
impl RouteEntry {
fn new() -> Self {
RouteEntry { counts: HashMap::new(), latency: HistogramEntry::new() }
}
}
struct RouteStore {
entries: HashMap<(String, String), RouteEntry>,
}
static ROUTE_STORE: OnceLock<Mutex<RouteStore>> = OnceLock::new();
fn route_store() -> &'static Mutex<RouteStore> {
ROUTE_STORE.get_or_init(|| Mutex::new(RouteStore { entries: HashMap::new() }))
}
pub fn record_route(method: &str, path: &str, status: i16, elapsed_secs: f64) {
let key = (method.to_string(), path.to_string());
let mut guard = route_store().lock().unwrap();
let entry = guard.entries.entry(key).or_insert_with(RouteEntry::new);
*entry.counts.entry(status).or_insert(0) += 1;
entry.latency.observe(elapsed_secs);
}
fn strip_query(uri: &str) -> &str {
match uri.find('?') {
Some(i) => &uri[..i],
None => uri,
}
}
pub struct MetricsLayer;
impl Middleware for MetricsLayer {
fn handle(
&self,
request: &Request,
connection: &ConnectionInfo,
next: &dyn Application,
) -> Result<Response, String> {
let start = Instant::now();
let result = next.execute(request, connection);
let elapsed = start.elapsed().as_secs_f64();
let path = strip_query(&request.request_uri).to_string();
let status = match &result {
Ok(r) => r.status_code,
Err(_) => 500,
};
record_route(&request.method, &path, status, elapsed);
result
}
}
#[cfg(test)]
mod tests;
pub fn prometheus_text() -> String {
let requests = REQUESTS_TOTAL.load(Ordering::Relaxed);
let errors = ERRORS_TOTAL.load(Ordering::Relaxed);
let active = ACTIVE_CONNECTIONS.load(Ordering::Relaxed);
let mut out = format!(
"# HELP rws_requests_total Total HTTP requests handled\n\
# TYPE rws_requests_total counter\n\
rws_requests_total {}\n\n\
# HELP rws_errors_total HTTP requests that returned an application error\n\
# TYPE rws_errors_total counter\n\
rws_errors_total {}\n\n\
# HELP rws_active_connections Currently open connections\n\
# TYPE rws_active_connections gauge\n\
rws_active_connections {}\n",
requests, errors, active
);
let route_text = route_prometheus_text();
if !route_text.is_empty() {
out.push('\n');
out.push_str(&route_text);
}
out
}
fn route_prometheus_text() -> String {
let guard = route_store().lock().unwrap();
if guard.entries.is_empty() {
return String::new();
}
let mut keys: Vec<&(String, String)> = guard.entries.keys().collect();
keys.sort();
let mut out = String::new();
out.push_str("# HELP rws_route_requests_total Total requests handled per route\n");
out.push_str("# TYPE rws_route_requests_total counter\n");
for key in &keys {
let entry = &guard.entries[key];
let (method, path) = key;
let mut statuses: Vec<i16> = entry.counts.keys().cloned().collect();
statuses.sort();
for status in statuses {
let count = entry.counts[&status];
out.push_str(&format!(
"rws_route_requests_total{{method=\"{}\",path=\"{}\",status=\"{}\"}} {}\n",
method, path, status, count,
));
}
}
out.push('\n');
out.push_str("# HELP rws_route_duration_seconds Request duration in seconds per route\n");
out.push_str("# TYPE rws_route_duration_seconds histogram\n");
for key in &keys {
let entry = &guard.entries[key];
let (method, path) = key;
let lat = &entry.latency;
for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
out.push_str(&format!(
"rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"{}\"}} {}\n",
method, path, upper, lat.buckets[i],
));
}
out.push_str(&format!(
"rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"+Inf\"}} {}\n",
method, path, lat.count,
));
out.push_str(&format!(
"rws_route_duration_seconds_sum{{method=\"{}\",path=\"{}\"}} {:.9}\n",
method, path, lat.sum,
));
out.push_str(&format!(
"rws_route_duration_seconds_count{{method=\"{}\",path=\"{}\"}} {}\n",
method, path, lat.count,
));
}
out
}