pub mod tracing;
use axum::{
body::Body,
extract::{Request, State},
http::{header, HeaderValue, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use dashmap::DashMap;
use rusmes_config::{MetricsBasicAuthConfig, MetricsConfig};
use rusmes_proto::Mail;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};
use tokio::net::TcpListener;
static GLOBAL_METRICS: OnceLock<MetricsCollector> = OnceLock::new();
#[derive(Debug, thiserror::Error)]
#[error("global MetricsCollector has already been initialised")]
pub struct GlobalMetricsAlreadySet;
pub fn set_global_metrics(collector: MetricsCollector) -> Result<(), GlobalMetricsAlreadySet> {
GLOBAL_METRICS
.set(collector)
.map_err(|_| GlobalMetricsAlreadySet)
}
pub fn global_metrics() -> &'static MetricsCollector {
GLOBAL_METRICS.get_or_init(MetricsCollector::new)
}
pub mod tls_label {
pub const YES: &str = "yes";
pub const NO: &str = "no";
pub const STARTTLS: &str = "starttls";
}
pub type DomainStatsSource = Arc<dyn Fn() -> HashMap<String, u64> + Send + Sync>;
#[derive(Debug, Clone)]
struct Histogram {
buckets: Vec<f64>,
counts: Vec<Arc<AtomicU64>>,
sum: Arc<AtomicU64>,
count: Arc<AtomicU64>,
}
impl Histogram {
fn new(buckets: Vec<f64>) -> Self {
let counts = buckets
.iter()
.map(|_| Arc::new(AtomicU64::new(0)))
.collect();
Self {
buckets,
counts,
sum: Arc::new(AtomicU64::new(0)),
count: Arc::new(AtomicU64::new(0)),
}
}
fn observe(&self, value: f64) {
let millis = (value * 1000.0) as u64;
self.sum.fetch_add(millis, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
for (i, &bucket) in self.buckets.iter().enumerate() {
if value <= bucket {
self.counts[i].fetch_add(1, Ordering::Relaxed);
}
}
}
fn export(&self, name: &str, help: &str) -> String {
let mut output = String::new();
output.push_str(&format!("# HELP {} {}\n", name, help));
output.push_str(&format!("# TYPE {} histogram\n", name));
for (i, &bucket) in self.buckets.iter().enumerate() {
let count = self.counts[i].load(Ordering::Relaxed);
output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
}
output.push_str(&format!(
"{}_bucket{{le=\"+Inf\"}} {}\n",
name,
self.count.load(Ordering::Relaxed)
));
output.push_str(&format!(
"{}_sum {}\n",
name,
self.sum.load(Ordering::Relaxed) as f64 / 1000.0
));
output.push_str(&format!(
"{}_count {}\n",
name,
self.count.load(Ordering::Relaxed)
));
output
}
}
pub struct Timer {
start: Instant,
histogram: Arc<Histogram>,
}
impl Timer {
fn new(histogram: Arc<Histogram>) -> Self {
Self {
start: Instant::now(),
histogram,
}
}
pub fn observe(self) {
let duration = self.start.elapsed().as_secs_f64();
self.histogram.observe(duration);
}
}
pub struct ConnectionGuard {
metrics: MetricsCollector,
protocol: String,
}
impl Drop for ConnectionGuard {
fn drop(&mut self) {
self.metrics.dec_active_connections(&self.protocol);
}
}
#[derive(Clone)]
pub struct MetricsCollector {
smtp_connections_total: Arc<AtomicU64>,
smtp_messages_received: Arc<AtomicU64>,
smtp_messages_sent: Arc<AtomicU64>,
smtp_errors: Arc<AtomicU64>,
smtp_auth_success_total: Arc<AtomicU64>,
smtp_auth_failure_total: Arc<AtomicU64>,
smtp_messages_rejected_total: Arc<AtomicU64>,
smtp_connections_rejected_blocked: Arc<AtomicU64>,
smtp_connections_rejected_overload: Arc<AtomicU64>,
imap_connections_total: Arc<AtomicU64>,
imap_commands_total: Arc<AtomicU64>,
imap_errors: Arc<AtomicU64>,
jmap_requests_total: Arc<AtomicU64>,
jmap_errors: Arc<AtomicU64>,
push_deliveries_total: Arc<AtomicU64>,
push_delivery_failures_total: Arc<AtomicU64>,
mail_processed_total: Arc<AtomicU64>,
mail_delivered_total: Arc<AtomicU64>,
mail_bounced_total: Arc<AtomicU64>,
mail_dropped_total: Arc<AtomicU64>,
queue_size: Arc<AtomicU64>,
queue_retries: Arc<AtomicU64>,
mailboxes_total: Arc<AtomicU64>,
messages_total: Arc<AtomicU64>,
storage_bytes: Arc<AtomicU64>,
message_processing_latency: Arc<Histogram>,
smtp_session_duration: Arc<Histogram>,
active_connections: Arc<DashMap<String, Arc<AtomicI64>>>,
tls_sessions_total: Arc<DashMap<String, Arc<AtomicU64>>>,
messages_per_domain: Arc<DashMap<String, Arc<AtomicU64>>>,
domain_stats_source: Arc<Mutex<Option<DomainStatsSource>>>,
}
impl std::fmt::Debug for MetricsCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsCollector")
.field(
"smtp_connections_total",
&self.smtp_connections_total.load(Ordering::Relaxed),
)
.field(
"active_connections_protocols",
&self.active_connections.len(),
)
.field("tls_label_count", &self.tls_sessions_total.len())
.field("domain_label_count", &self.messages_per_domain.len())
.finish()
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl MetricsCollector {
pub fn new() -> Self {
let latency_buckets = vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
let duration_buckets = vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0];
Self {
smtp_connections_total: Arc::new(AtomicU64::new(0)),
smtp_messages_received: Arc::new(AtomicU64::new(0)),
smtp_messages_sent: Arc::new(AtomicU64::new(0)),
smtp_errors: Arc::new(AtomicU64::new(0)),
smtp_auth_success_total: Arc::new(AtomicU64::new(0)),
smtp_auth_failure_total: Arc::new(AtomicU64::new(0)),
smtp_messages_rejected_total: Arc::new(AtomicU64::new(0)),
smtp_connections_rejected_blocked: Arc::new(AtomicU64::new(0)),
smtp_connections_rejected_overload: Arc::new(AtomicU64::new(0)),
imap_connections_total: Arc::new(AtomicU64::new(0)),
imap_commands_total: Arc::new(AtomicU64::new(0)),
imap_errors: Arc::new(AtomicU64::new(0)),
jmap_requests_total: Arc::new(AtomicU64::new(0)),
jmap_errors: Arc::new(AtomicU64::new(0)),
push_deliveries_total: Arc::new(AtomicU64::new(0)),
push_delivery_failures_total: Arc::new(AtomicU64::new(0)),
mail_processed_total: Arc::new(AtomicU64::new(0)),
mail_delivered_total: Arc::new(AtomicU64::new(0)),
mail_bounced_total: Arc::new(AtomicU64::new(0)),
mail_dropped_total: Arc::new(AtomicU64::new(0)),
queue_size: Arc::new(AtomicU64::new(0)),
queue_retries: Arc::new(AtomicU64::new(0)),
mailboxes_total: Arc::new(AtomicU64::new(0)),
messages_total: Arc::new(AtomicU64::new(0)),
storage_bytes: Arc::new(AtomicU64::new(0)),
message_processing_latency: Arc::new(Histogram::new(latency_buckets)),
smtp_session_duration: Arc::new(Histogram::new(duration_buckets)),
active_connections: Arc::new(DashMap::new()),
tls_sessions_total: Arc::new(DashMap::new()),
messages_per_domain: Arc::new(DashMap::new()),
domain_stats_source: Arc::new(Mutex::new(None)),
}
}
pub fn record_mail_completed(&self, _mail: &Mail) {
self.inc_mail_processed();
self.inc_mail_delivered();
}
pub fn inc_smtp_connections(&self) {
self.smtp_connections_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_messages_received(&self) {
self.smtp_messages_received.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_messages_sent(&self) {
self.smtp_messages_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_errors(&self) {
self.smtp_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_auth_success(&self) {
self.smtp_auth_success_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_auth_failure(&self) {
self.smtp_auth_failure_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_smtp_messages_rejected(&self) {
self.smtp_messages_rejected_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn smtp_auth_success_count(&self) -> u64 {
self.smtp_auth_success_total.load(Ordering::Relaxed)
}
pub fn smtp_auth_failure_count(&self) -> u64 {
self.smtp_auth_failure_total.load(Ordering::Relaxed)
}
pub fn smtp_messages_rejected_count(&self) -> u64 {
self.smtp_messages_rejected_total.load(Ordering::Relaxed)
}
pub fn inc_smtp_connections_rejected_blocked(&self) {
self.smtp_connections_rejected_blocked
.fetch_add(1, Ordering::Relaxed);
}
pub fn smtp_connections_rejected_blocked_count(&self) -> u64 {
self.smtp_connections_rejected_blocked
.load(Ordering::Relaxed)
}
pub fn inc_smtp_connections_rejected_overload(&self) {
self.smtp_connections_rejected_overload
.fetch_add(1, Ordering::Relaxed);
}
pub fn smtp_connections_rejected_overload_count(&self) -> u64 {
self.smtp_connections_rejected_overload
.load(Ordering::Relaxed)
}
pub fn smtp_messages_accepted_count(&self) -> u64 {
self.smtp_messages_received.load(Ordering::Relaxed)
}
pub fn smtp_connections_count(&self) -> u64 {
self.smtp_connections_total.load(Ordering::Relaxed)
}
pub fn inc_imap_connections(&self) {
self.imap_connections_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_imap_commands(&self) {
self.imap_commands_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_imap_errors(&self) {
self.imap_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_jmap_requests(&self) {
self.jmap_requests_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_jmap_errors(&self) {
self.jmap_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_push_deliveries(&self) {
self.push_deliveries_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_push_delivery_failures(&self) {
self.push_delivery_failures_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn push_deliveries_count(&self) -> u64 {
self.push_deliveries_total.load(Ordering::Relaxed)
}
pub fn push_delivery_failures_count(&self) -> u64 {
self.push_delivery_failures_total.load(Ordering::Relaxed)
}
pub fn inc_mail_processed(&self) {
self.mail_processed_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_mail_delivered(&self) {
self.mail_delivered_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_mail_bounced(&self) {
self.mail_bounced_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_mail_dropped(&self) {
self.mail_dropped_total.fetch_add(1, Ordering::Relaxed);
}
pub fn set_queue_size(&self, size: u64) {
self.queue_size.store(size, Ordering::Relaxed);
}
pub fn inc_queue_retries(&self) {
self.queue_retries.fetch_add(1, Ordering::Relaxed);
}
pub fn set_mailboxes_total(&self, count: u64) {
self.mailboxes_total.store(count, Ordering::Relaxed);
}
pub fn set_messages_total(&self, count: u64) {
self.messages_total.store(count, Ordering::Relaxed);
}
pub fn set_storage_bytes(&self, bytes: u64) {
self.storage_bytes.store(bytes, Ordering::Relaxed);
}
pub fn start_message_processing_timer(&self) -> Timer {
Timer::new(Arc::clone(&self.message_processing_latency))
}
pub fn start_smtp_session_timer(&self) -> Timer {
Timer::new(Arc::clone(&self.smtp_session_duration))
}
pub fn inc_active_connections(&self, protocol: &str) {
let entry = self
.active_connections
.entry(protocol.to_owned())
.or_insert_with(|| Arc::new(AtomicI64::new(0)));
entry.fetch_add(1, Ordering::Relaxed);
}
pub fn dec_active_connections(&self, protocol: &str) {
let entry = self
.active_connections
.entry(protocol.to_owned())
.or_insert_with(|| Arc::new(AtomicI64::new(0)));
entry.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_connections(&self, protocol: &str) -> i64 {
self.active_connections
.get(protocol)
.map(|v| v.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn connection_guard(&self, protocol: &str) -> ConnectionGuard {
self.inc_active_connections(protocol);
ConnectionGuard {
metrics: self.clone(),
protocol: protocol.to_owned(),
}
}
pub fn inc_tls_session(&self, tls_kind: &str) {
let entry = self
.tls_sessions_total
.entry(tls_kind.to_owned())
.or_insert_with(|| Arc::new(AtomicU64::new(0)));
entry.fetch_add(1, Ordering::Relaxed);
}
pub fn tls_session_count(&self, tls_kind: &str) -> u64 {
self.tls_sessions_total
.get(tls_kind)
.map(|v| v.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn set_messages_per_domain(&self, domain: &str, count: u64) {
let entry = self
.messages_per_domain
.entry(domain.to_owned())
.or_insert_with(|| Arc::new(AtomicU64::new(0)));
entry.store(count, Ordering::Relaxed);
}
pub fn inc_messages_per_domain(&self, domain: &str) {
let entry = self
.messages_per_domain
.entry(domain.to_owned())
.or_insert_with(|| Arc::new(AtomicU64::new(0)));
entry.fetch_add(1, Ordering::Relaxed);
}
pub fn messages_per_domain(&self) -> HashMap<String, u64> {
self.messages_per_domain
.iter()
.map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
.collect()
}
pub fn set_domain_stats_source(&self, source: DomainStatsSource) {
if let Ok(mut guard) = self.domain_stats_source.lock() {
*guard = Some(source);
}
}
pub fn spawn_domain_stats_refresher(&self, period: Duration) -> tokio::task::JoinHandle<()> {
let collector = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
collector.refresh_domain_stats_now();
}
})
}
pub fn refresh_domain_stats_now(&self) {
let snapshot = match self.domain_stats_source.lock() {
Ok(guard) => match guard.as_ref() {
Some(src) => src(),
None => return,
},
Err(_) => return,
};
for (domain, count) in snapshot {
self.set_messages_per_domain(&domain, count);
}
}
pub fn export_prometheus(&self) -> String {
let mut output = String::new();
output.push_str("# HELP rusmes_smtp_connections_total Total SMTP connections\n");
output.push_str("# TYPE rusmes_smtp_connections_total counter\n");
output.push_str(&format!(
"rusmes_smtp_connections_total {}\n",
self.smtp_connections_total.load(Ordering::Relaxed)
));
output
.push_str("# HELP rusmes_smtp_messages_received_total Total SMTP messages received\n");
output.push_str("# TYPE rusmes_smtp_messages_received_total counter\n");
output.push_str(&format!(
"rusmes_smtp_messages_received_total {}\n",
self.smtp_messages_received.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_messages_sent_total Total SMTP messages sent\n");
output.push_str("# TYPE rusmes_smtp_messages_sent_total counter\n");
output.push_str(&format!(
"rusmes_smtp_messages_sent_total {}\n",
self.smtp_messages_sent.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_errors_total Total SMTP errors\n");
output.push_str("# TYPE rusmes_smtp_errors_total counter\n");
output.push_str(&format!(
"rusmes_smtp_errors_total {}\n",
self.smtp_errors.load(Ordering::Relaxed)
));
output.push_str(
"# HELP rusmes_smtp_auth_success_total Total successful SMTP AUTH exchanges\n",
);
output.push_str("# TYPE rusmes_smtp_auth_success_total counter\n");
output.push_str(&format!(
"rusmes_smtp_auth_success_total {}\n",
self.smtp_auth_success_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_auth_failure_total Total failed SMTP AUTH exchanges\n");
output.push_str("# TYPE rusmes_smtp_auth_failure_total counter\n");
output.push_str(&format!(
"rusmes_smtp_auth_failure_total {}\n",
self.smtp_auth_failure_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_messages_rejected_total Total SMTP messages rejected due to size limit exceeded during DATA\n");
output.push_str("# TYPE rusmes_smtp_messages_rejected_total counter\n");
output.push_str(&format!(
"rusmes_smtp_messages_rejected_total {}\n",
self.smtp_messages_rejected_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_connections_rejected_blocked_total Total SMTP connections rejected due to blocked IP\n");
output.push_str("# TYPE rusmes_smtp_connections_rejected_blocked_total counter\n");
output.push_str(&format!(
"rusmes_smtp_connections_rejected_blocked_total {}\n",
self.smtp_connections_rejected_blocked
.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_smtp_connections_rejected_overload_total Total SMTP connections rejected due to per-IP connection cap exceeded\n");
output.push_str("# TYPE rusmes_smtp_connections_rejected_overload_total counter\n");
output.push_str(&format!(
"rusmes_smtp_connections_rejected_overload_total {}\n",
self.smtp_connections_rejected_overload
.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_imap_connections_total Total IMAP connections\n");
output.push_str("# TYPE rusmes_imap_connections_total counter\n");
output.push_str(&format!(
"rusmes_imap_connections_total {}\n",
self.imap_connections_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_imap_commands_total Total IMAP commands\n");
output.push_str("# TYPE rusmes_imap_commands_total counter\n");
output.push_str(&format!(
"rusmes_imap_commands_total {}\n",
self.imap_commands_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_imap_errors_total Total IMAP errors\n");
output.push_str("# TYPE rusmes_imap_errors_total counter\n");
output.push_str(&format!(
"rusmes_imap_errors_total {}\n",
self.imap_errors.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_jmap_requests_total Total JMAP requests\n");
output.push_str("# TYPE rusmes_jmap_requests_total counter\n");
output.push_str(&format!(
"rusmes_jmap_requests_total {}\n",
self.jmap_requests_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_jmap_errors_total Total JMAP errors\n");
output.push_str("# TYPE rusmes_jmap_errors_total counter\n");
output.push_str(&format!(
"rusmes_jmap_errors_total {}\n",
self.jmap_errors.load(Ordering::Relaxed)
));
output
.push_str("# HELP rusmes_push_deliveries_total Total successful WebPush deliveries\n");
output.push_str("# TYPE rusmes_push_deliveries_total counter\n");
output.push_str(&format!(
"rusmes_push_deliveries_total {}\n",
self.push_deliveries_total.load(Ordering::Relaxed)
));
output.push_str(
"# HELP rusmes_push_delivery_failures_total Total WebPush final delivery failures\n",
);
output.push_str("# TYPE rusmes_push_delivery_failures_total counter\n");
output.push_str(&format!(
"rusmes_push_delivery_failures_total {}\n",
self.push_delivery_failures_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_mail_processed_total Total mail processed\n");
output.push_str("# TYPE rusmes_mail_processed_total counter\n");
output.push_str(&format!(
"rusmes_mail_processed_total {}\n",
self.mail_processed_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_mail_delivered_total Total mail delivered\n");
output.push_str("# TYPE rusmes_mail_delivered_total counter\n");
output.push_str(&format!(
"rusmes_mail_delivered_total {}\n",
self.mail_delivered_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_mail_bounced_total Total mail bounced\n");
output.push_str("# TYPE rusmes_mail_bounced_total counter\n");
output.push_str(&format!(
"rusmes_mail_bounced_total {}\n",
self.mail_bounced_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_mail_dropped_total Total mail dropped\n");
output.push_str("# TYPE rusmes_mail_dropped_total counter\n");
output.push_str(&format!(
"rusmes_mail_dropped_total {}\n",
self.mail_dropped_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_queue_size Current queue size\n");
output.push_str("# TYPE rusmes_queue_size gauge\n");
output.push_str(&format!(
"rusmes_queue_size {}\n",
self.queue_size.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_queue_retries_total Total queue retries\n");
output.push_str("# TYPE rusmes_queue_retries_total counter\n");
output.push_str(&format!(
"rusmes_queue_retries_total {}\n",
self.queue_retries.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_mailboxes_total Total mailboxes\n");
output.push_str("# TYPE rusmes_mailboxes_total gauge\n");
output.push_str(&format!(
"rusmes_mailboxes_total {}\n",
self.mailboxes_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_messages_total Total messages\n");
output.push_str("# TYPE rusmes_messages_total gauge\n");
output.push_str(&format!(
"rusmes_messages_total {}\n",
self.messages_total.load(Ordering::Relaxed)
));
output.push_str("# HELP rusmes_storage_bytes Total storage bytes\n");
output.push_str("# TYPE rusmes_storage_bytes gauge\n");
output.push_str(&format!(
"rusmes_storage_bytes {}\n",
self.storage_bytes.load(Ordering::Relaxed)
));
output.push_str(&self.message_processing_latency.export(
"rusmes_message_processing_latency_seconds",
"Message processing latency in seconds",
));
output.push_str(&self.smtp_session_duration.export(
"rusmes_smtp_session_duration_seconds",
"SMTP session duration in seconds",
));
output.push_str(
"# HELP rusmes_active_connections Currently open client connections per protocol\n",
);
output.push_str("# TYPE rusmes_active_connections gauge\n");
let mut active: Vec<(String, i64)> = self
.active_connections
.iter()
.map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
.collect();
active.sort_by(|a, b| a.0.cmp(&b.0));
for (protocol, value) in active {
output.push_str(&format!(
"rusmes_active_connections{{protocol=\"{}\"}} {}\n",
escape_label_value(&protocol),
value
));
}
output.push_str(
"# HELP rusmes_tls_sessions_total Total client sessions seen, partitioned by TLS state\n",
);
output.push_str("# TYPE rusmes_tls_sessions_total counter\n");
let mut tls: Vec<(String, u64)> = self
.tls_sessions_total
.iter()
.map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
.collect();
tls.sort_by(|a, b| a.0.cmp(&b.0));
for (label, value) in tls {
output.push_str(&format!(
"rusmes_tls_sessions_total{{tls=\"{}\"}} {}\n",
escape_label_value(&label),
value
));
}
self.refresh_domain_stats_now();
output.push_str(
"# HELP rusmes_messages_per_domain_total Total messages enqueued per recipient domain\n",
);
output.push_str("# TYPE rusmes_messages_per_domain_total counter\n");
let mut domains: Vec<(String, u64)> = self
.messages_per_domain
.iter()
.map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
.collect();
domains.sort_by(|a, b| a.0.cmp(&b.0));
for (domain, value) in domains {
output.push_str(&format!(
"rusmes_messages_per_domain_total{{domain=\"{}\"}} {}\n",
escape_label_value(&domain),
value
));
}
output
}
pub async fn start_http_server(self, config: MetricsConfig) -> anyhow::Result<()> {
if !config.enabled {
eprintln!("Metrics HTTP server is disabled");
return Ok(());
}
config.validate_bind_address()?;
config.validate_path()?;
let bind_address = config.bind_address.clone();
let metrics_path = config.path.clone();
let basic_auth = config.basic_auth.clone();
let app = self.build_router(&metrics_path, basic_auth);
eprintln!(
"Starting metrics HTTP server on {}{}",
bind_address, metrics_path
);
eprintln!("Health check endpoints: /health, /ready, /live");
let listener = TcpListener::bind(&bind_address).await?;
axum::serve(listener, app).await?;
Ok(())
}
pub fn build_router(
self,
metrics_path: &str,
basic_auth: Option<MetricsBasicAuthConfig>,
) -> Router {
let metrics = Arc::new(Mutex::new(self));
let metrics_handler = {
let metrics = Arc::clone(&metrics);
move || {
let metrics = Arc::clone(&metrics);
async move {
let collector = match metrics.lock() {
Ok(guard) => guard,
Err(e) => {
::tracing::error!("Metrics mutex poisoned: {e}");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
let output = collector.export_prometheus();
(
[(header::CONTENT_TYPE, "text/plain; version=0.0.4")],
output,
)
.into_response()
}
}
};
let mut metrics_router = Router::new().route(metrics_path, get(metrics_handler));
if let Some(auth) = basic_auth {
let state = Arc::new(BasicAuthState { config: auth });
metrics_router =
metrics_router.layer(middleware::from_fn_with_state(state, basic_auth_middleware));
}
let health_router = create_health_router();
Router::new().merge(metrics_router).merge(health_router)
}
}
#[derive(Clone)]
struct BasicAuthState {
config: MetricsBasicAuthConfig,
}
async fn basic_auth_middleware(
State(state): State<Arc<BasicAuthState>>,
request: Request,
next: Next,
) -> Response {
let header_value = request.headers().get(header::AUTHORIZATION);
if !verify_basic_auth(header_value, &state.config) {
let mut response = StatusCode::UNAUTHORIZED.into_response();
let realm = HeaderValue::from_static("Basic realm=\"rusmes-metrics\", charset=\"UTF-8\"");
response
.headers_mut()
.insert(header::WWW_AUTHENTICATE, realm);
*response.body_mut() = Body::from("401 Unauthorized: metrics endpoint requires basic auth");
return response;
}
next.run(request).await
}
fn verify_basic_auth(header_value: Option<&HeaderValue>, config: &MetricsBasicAuthConfig) -> bool {
let value = match header_value.and_then(|v| v.to_str().ok()) {
Some(v) => v,
None => return false,
};
let encoded = match value.strip_prefix("Basic ") {
Some(s) => s.trim(),
None => return false,
};
let decoded = match BASE64.decode(encoded) {
Ok(bytes) => bytes,
Err(_) => return false,
};
let credentials = match std::str::from_utf8(&decoded) {
Ok(s) => s,
Err(_) => return false,
};
let (user, password) = match credentials.split_once(':') {
Some(parts) => parts,
None => return false,
};
if !bytes_eq_constant_time(user.as_bytes(), config.username.as_bytes()) {
return false;
}
match bcrypt::verify(password, &config.password_hash) {
Ok(ok) => ok,
Err(e) => {
::tracing::warn!(
"bcrypt verify failed for metrics basic auth (likely malformed hash in config): {e}"
);
false
}
}
}
fn bytes_eq_constant_time(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff: u8 = 0;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
fn escape_label_value(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for ch in value.chars() {
match ch {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
other => out.push(other),
}
}
out
}
#[derive(Debug, Serialize, Clone)]
pub struct HealthResponse {
pub status: String,
pub checks: HealthChecks,
}
#[derive(Debug, Serialize, Clone)]
pub struct HealthChecks {
pub storage: String,
pub queue: String,
}
#[derive(Debug, Serialize, Clone)]
pub struct ReadyResponse {
pub ready: bool,
}
#[derive(Debug, Serialize, Clone)]
pub struct LiveResponse {
pub alive: bool,
}
async fn health_check() -> (StatusCode, Json<HealthResponse>) {
let storage_status = check_storage().await;
let queue_status = check_queue().await;
let all_healthy = storage_status == "healthy" && queue_status == "healthy";
let status_code = if all_healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = HealthResponse {
status: if all_healthy {
"healthy".to_string()
} else {
"unhealthy".to_string()
},
checks: HealthChecks {
storage: storage_status,
queue: queue_status,
},
};
(status_code, Json(response))
}
async fn readiness_check() -> (StatusCode, Json<ReadyResponse>) {
let ready = true;
let status_code = if ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(status_code, Json(ReadyResponse { ready }))
}
async fn liveness_check() -> (StatusCode, Json<LiveResponse>) {
(StatusCode::OK, Json(LiveResponse { alive: true }))
}
async fn check_storage() -> String {
"healthy".to_string()
}
async fn check_queue() -> String {
"healthy".to_string()
}
pub fn create_health_router() -> Router {
Router::new()
.route("/health", get(health_check))
.route("/ready", get(readiness_check))
.route("/live", get(liveness_check))
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request as HttpRequest;
use http_body_util::BodyExt;
use tower::ServiceExt;
#[test]
fn test_metrics_collector() {
let metrics = MetricsCollector::new();
metrics.inc_smtp_connections();
metrics.inc_smtp_messages_received();
metrics.inc_mail_processed();
metrics.inc_mail_delivered();
assert_eq!(metrics.smtp_connections_total.load(Ordering::Relaxed), 1);
assert_eq!(metrics.smtp_messages_received.load(Ordering::Relaxed), 1);
assert_eq!(metrics.mail_processed_total.load(Ordering::Relaxed), 1);
assert_eq!(metrics.mail_delivered_total.load(Ordering::Relaxed), 1);
}
#[test]
fn test_prometheus_export() {
let metrics = MetricsCollector::new();
metrics.inc_smtp_connections();
metrics.set_queue_size(42);
let output = metrics.export_prometheus();
assert!(output.contains("rusmes_smtp_connections_total 1"));
assert!(output.contains("rusmes_queue_size 42"));
assert!(output.contains("# HELP"));
assert!(output.contains("# TYPE"));
}
#[test]
fn test_active_connections_guard_roundtrip() {
let metrics = MetricsCollector::new();
assert_eq!(metrics.active_connections("smtp"), 0);
{
let _g = metrics.connection_guard("smtp");
assert_eq!(metrics.active_connections("smtp"), 1);
{
let _g2 = metrics.connection_guard("smtp");
assert_eq!(metrics.active_connections("smtp"), 2);
}
assert_eq!(metrics.active_connections("smtp"), 1);
}
assert_eq!(metrics.active_connections("smtp"), 0);
let _g = metrics.connection_guard("imap");
assert_eq!(metrics.active_connections("imap"), 1);
assert_eq!(metrics.active_connections("smtp"), 0);
}
#[test]
fn test_tls_session_counter_labels() {
let metrics = MetricsCollector::new();
metrics.inc_tls_session(tls_label::NO);
metrics.inc_tls_session(tls_label::NO);
metrics.inc_tls_session(tls_label::STARTTLS);
metrics.inc_tls_session(tls_label::YES);
assert_eq!(metrics.tls_session_count(tls_label::NO), 2);
assert_eq!(metrics.tls_session_count(tls_label::STARTTLS), 1);
assert_eq!(metrics.tls_session_count(tls_label::YES), 1);
let exp = metrics.export_prometheus();
assert!(exp.contains("rusmes_tls_sessions_total{tls=\"no\"} 2"));
assert!(exp.contains("rusmes_tls_sessions_total{tls=\"starttls\"} 1"));
assert!(exp.contains("rusmes_tls_sessions_total{tls=\"yes\"} 1"));
assert!(exp.contains("# TYPE rusmes_tls_sessions_total counter"));
}
#[test]
fn test_messages_per_domain_from_callback_source() {
let metrics = MetricsCollector::new();
metrics.set_domain_stats_source(Arc::new(|| {
let mut m = HashMap::new();
m.insert("example.com".to_string(), 5u64);
m.insert("example.org".to_string(), 3u64);
m
}));
let exp = metrics.export_prometheus();
assert!(
exp.contains("rusmes_messages_per_domain_total{domain=\"example.com\"} 5"),
"exposition was:\n{exp}"
);
assert!(exp.contains("rusmes_messages_per_domain_total{domain=\"example.org\"} 3"));
assert!(exp.contains("# TYPE rusmes_messages_per_domain_total counter"));
}
#[test]
fn test_escape_label_value_quotes_and_backslash() {
assert_eq!(escape_label_value("plain"), "plain");
assert_eq!(escape_label_value("a\"b"), "a\\\"b");
assert_eq!(escape_label_value("a\\b"), "a\\\\b");
assert_eq!(escape_label_value("a\nb"), "a\\nb");
}
#[test]
fn test_constant_time_eq() {
assert!(bytes_eq_constant_time(b"abc", b"abc"));
assert!(!bytes_eq_constant_time(b"abc", b"abd"));
assert!(!bytes_eq_constant_time(b"abc", b"abcd"));
assert!(bytes_eq_constant_time(b"", b""));
}
fn router_with_basic_auth(creds: Option<(&str, &str)>) -> Router {
let metrics = MetricsCollector::new();
let auth = creds.map(|(u, p)| MetricsBasicAuthConfig {
username: u.to_string(),
password_hash: bcrypt::hash(p, 4).expect("bcrypt hash for test"),
});
metrics.build_router("/metrics", auth)
}
#[tokio::test]
async fn test_metrics_basic_auth_accepts_correct_credentials() {
let app = router_with_basic_auth(Some(("scrape", "s3cret")));
let resp = app
.clone()
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
let www_auth = resp
.headers()
.get(header::WWW_AUTHENTICATE)
.expect("WWW-Authenticate header on 401")
.to_str()
.expect("ascii header");
assert!(www_auth.starts_with("Basic realm="), "got: {www_auth}");
let creds = BASE64.encode(b"scrape:s3cret");
let resp = app
.clone()
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.header(header::AUTHORIZATION, format!("Basic {creds}"))
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = resp
.into_body()
.collect()
.await
.expect("collect")
.to_bytes();
let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
assert!(body_text.contains("# HELP"), "body was:\n{body_text}");
}
#[tokio::test]
async fn test_metrics_basic_auth_rejects_wrong_credentials() {
let app = router_with_basic_auth(Some(("scrape", "s3cret")));
let bad_user = BASE64.encode(b"wrong:s3cret");
let resp = app
.clone()
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.header(header::AUTHORIZATION, format!("Basic {bad_user}"))
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
let bad_pass = BASE64.encode(b"scrape:wrong");
let resp = app
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.header(header::AUTHORIZATION, format!("Basic {bad_pass}"))
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_metrics_no_basic_auth_serves_anonymously() {
let app = router_with_basic_auth(None);
let resp = app
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::OK);
}
#[test]
fn test_connection_guard_increments_and_decrements() {
let metrics = MetricsCollector::new();
assert_eq!(metrics.active_connections("pop3"), 0);
let guard = metrics.connection_guard("pop3");
assert_eq!(metrics.active_connections("pop3"), 1);
drop(guard);
assert_eq!(metrics.active_connections("pop3"), 0);
}
#[test]
fn test_connection_metrics_total() {
let metrics = MetricsCollector::new();
let _g1 = metrics.connection_guard("smtp");
let _g2 = metrics.connection_guard("imap");
let _g3 = metrics.connection_guard("pop3");
assert_eq!(metrics.active_connections("smtp"), 1);
assert_eq!(metrics.active_connections("imap"), 1);
assert_eq!(metrics.active_connections("pop3"), 1);
}
#[test]
fn test_prometheus_format() {
let metrics = MetricsCollector::new();
let _g = metrics.connection_guard("smtp");
let output = metrics.export_prometheus();
assert!(
output.contains("rusmes_active_connections{protocol=\"smtp\"} 1"),
"prometheus output was:\n{output}"
);
assert!(output.contains("# TYPE rusmes_active_connections gauge"));
}
#[tokio::test]
async fn test_metrics_server_responds() {
let metrics = MetricsCollector::new();
let _g = metrics.connection_guard("smtp");
let app = metrics.build_router("/metrics", None);
let resp = app
.oneshot(
HttpRequest::builder()
.uri("/metrics")
.body(Body::empty())
.expect("request build"),
)
.await
.expect("router call");
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = resp
.into_body()
.collect()
.await
.expect("collect")
.to_bytes();
let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
assert!(
body_text.contains("rusmes_active_connections"),
"body was:\n{body_text}"
);
}
#[test]
fn test_global_metrics_singleton() {
let a = global_metrics() as *const _;
let b = global_metrics() as *const _;
assert_eq!(a, b, "global_metrics() must return the same instance");
}
}