Skip to main content

rusmes_metrics/
lib.rs

1//! Observability layer for RusMES
2//!
3//! This crate provides a complete observability stack for the RusMES mail server:
4//!
5//! - **Prometheus-compatible metrics** exported over HTTP (pull-based scraping)
6//! - **OpenTelemetry distributed tracing** via the OTLP exporter (see [`tracing`] module)
7//! - **Kubernetes-compatible health probes** (`/health`, `/ready`, `/live`)
8//! - **Grafana dashboard** support via standard Prometheus metric naming
9//!
10//! # Key Features
11//!
12//! - Counter metrics for SMTP, IMAP, and JMAP protocol operations (connections, messages,
13//!   commands, errors)
14//! - Gauge metrics for queue depth, mailbox count, message count, and storage bytes
15//! - Histogram metrics with carefully chosen bucket boundaries for:
16//!   - **Message processing latency**: 1 ms – 10 s
17//!   - **SMTP session duration**: 100 ms – 600 s
18//! - Lock-free atomic counters (`AtomicU64`) — no contention on the hot path
19//! - Mutex-guarded histogram state for thread-safe observation
20//! - Integration with `tracing-opentelemetry` for correlating traces and logs
21//!
22//! # Usage
23//!
24//! ```rust,no_run
25//! use rusmes_metrics::MetricsCollector;
26//! use rusmes_config::MetricsConfig;
27//!
28//! # async fn example() -> anyhow::Result<()> {
29//! // Create a shared metrics collector (cheap to clone, backed by Arc)
30//! let metrics = MetricsCollector::new();
31//!
32//! // Increment counters on protocol events
33//! metrics.inc_smtp_connections();
34//! metrics.inc_smtp_messages_received();
35//!
36//! // Time an operation with a histogram
37//! let timer = metrics.start_message_processing_timer();
38//! // ... process message ...
39//! timer.observe();     // records elapsed seconds into the histogram
40//!
41//! // Expose a Prometheus-scrape endpoint
42//! let config = MetricsConfig {
43//!     enabled: true,
44//!     bind_address: "0.0.0.0:9090".to_string(),
45//!     path: "/metrics".to_string(),
46//!     basic_auth: None,
47//! };
48//! metrics.start_http_server(config).await?;
49//! # Ok(())
50//! # }
51//! ```
52//!
53//! # HTTP Endpoints
54//!
55//! | Path        | Description                              |
56//! |-------------|------------------------------------------|
57//! | `/metrics`  | Prometheus text-format metrics           |
58//! | `/health`   | JSON health report with component checks |
59//! | `/ready`    | Kubernetes readiness probe (HTTP 200)    |
60//! | `/live`     | Kubernetes liveness probe (HTTP 200)     |
61//!
62//! ```bash
63//! curl http://localhost:9090/metrics
64//! curl http://localhost:9090/health
65//! curl http://localhost:9090/ready
66//! curl http://localhost:9090/live
67//! ```
68//!
69//! # Histogram Buckets
70//!
71//! - **Message processing latency** (`rusmes_message_processing_latency_seconds`):
72//!   1 ms, 5 ms, 10 ms, 25 ms, 50 ms, 100 ms, 250 ms, 500 ms, 1 s, 2.5 s, 5 s, 10 s
73//! - **SMTP session duration** (`rusmes_smtp_session_duration_seconds`):
74//!   100 ms, 500 ms, 1 s, 5 s, 10 s, 30 s, 60 s, 120 s, 300 s, 600 s
75//!
76//! # OpenTelemetry / Distributed Tracing
77//!
78//! See the [`tracing`] sub-module for span helpers (`smtp_span`, `imap_span`,
79//! `jmap_span`, `mailet_span`, `delivery_span`) and the `init_tracing` function
80//! that wires up an OTLP exporter with configurable gRPC or HTTP transport.
81
82pub mod tracing;
83
84use axum::{
85    body::Body,
86    extract::{Request, State},
87    http::{header, HeaderValue, StatusCode},
88    middleware::{self, Next},
89    response::{IntoResponse, Response},
90    routing::get,
91    Json, Router,
92};
93use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
94use dashmap::DashMap;
95use rusmes_config::{MetricsBasicAuthConfig, MetricsConfig};
96use rusmes_proto::Mail;
97use serde::Serialize;
98use std::collections::HashMap;
99use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
100use std::sync::{Arc, Mutex, OnceLock};
101use std::time::{Duration, Instant};
102use tokio::net::TcpListener;
103
104/// Global metrics collector accessible by every protocol crate without explicit threading.
105///
106/// Set once on server bootstrap via [`set_global_metrics`]; protocol implementations call
107/// [`global_metrics`] to record events. Falls back to a one-time initialised default so
108/// tests and embedders that never call `set_global_metrics` still get a usable collector
109/// (rather than panicking or silently dropping data).
110static GLOBAL_METRICS: OnceLock<MetricsCollector> = OnceLock::new();
111
112/// Error returned by [`set_global_metrics`] when the global has already been initialised.
113///
114/// Cannot store the rejected collector inside the variant because it carries non-`'static`
115/// borrows on the closure-shaped fields (the source callback). The caller is expected
116/// to drop their `MetricsCollector` on receiving this error.
117#[derive(Debug, thiserror::Error)]
118#[error("global MetricsCollector has already been initialised")]
119pub struct GlobalMetricsAlreadySet;
120
121/// Install the process-wide [`MetricsCollector`] so protocol crates can record events
122/// without having to thread the handle through every constructor.
123///
124/// Returns `Err(GlobalMetricsAlreadySet)` if a collector has already been installed —
125/// callers should treat this as a non-fatal warning and continue using the existing
126/// global handle returned by [`global_metrics`].
127pub fn set_global_metrics(collector: MetricsCollector) -> Result<(), GlobalMetricsAlreadySet> {
128    GLOBAL_METRICS
129        .set(collector)
130        .map_err(|_| GlobalMetricsAlreadySet)
131}
132
133/// Get the process-wide metrics collector, lazily installing a fresh one the first time
134/// it is requested if `set_global_metrics` was never called.
135///
136/// Always returns a usable handle — never panics, never returns `None`.
137pub fn global_metrics() -> &'static MetricsCollector {
138    GLOBAL_METRICS.get_or_init(MetricsCollector::new)
139}
140
141/// TLS label values for the `rusmes_tls_sessions_total` counter.
142///
143/// Use these constants instead of stringly-typed labels to keep the cardinality bounded
144/// and avoid typo-induced label drift across protocol implementations.
145pub mod tls_label {
146    /// Session was established over a TLS-from-the-start (implicit TLS) port.
147    pub const YES: &str = "yes";
148    /// Session was established as plaintext and never upgraded.
149    pub const NO: &str = "no";
150    /// Session started plaintext and was upgraded via STARTTLS.
151    pub const STARTTLS: &str = "starttls";
152}
153
154/// Callback type used to feed the per-recipient-domain counter from a queue.
155///
156/// The closure must return a fresh snapshot each time it is called. The metrics layer
157/// is responsible for invoking it (either on every scrape or via the periodic refresh
158/// task spawned by [`MetricsCollector::spawn_domain_stats_refresher`]).
159pub type DomainStatsSource = Arc<dyn Fn() -> HashMap<String, u64> + Send + Sync>;
160
161/// Histogram bucket for tracking latency distributions
162#[derive(Debug, Clone)]
163struct Histogram {
164    buckets: Vec<f64>,
165    counts: Vec<Arc<AtomicU64>>,
166    sum: Arc<AtomicU64>,
167    count: Arc<AtomicU64>,
168}
169
170impl Histogram {
171    fn new(buckets: Vec<f64>) -> Self {
172        let counts = buckets
173            .iter()
174            .map(|_| Arc::new(AtomicU64::new(0)))
175            .collect();
176        Self {
177            buckets,
178            counts,
179            sum: Arc::new(AtomicU64::new(0)),
180            count: Arc::new(AtomicU64::new(0)),
181        }
182    }
183
184    fn observe(&self, value: f64) {
185        let millis = (value * 1000.0) as u64;
186        self.sum.fetch_add(millis, Ordering::Relaxed);
187        self.count.fetch_add(1, Ordering::Relaxed);
188
189        for (i, &bucket) in self.buckets.iter().enumerate() {
190            if value <= bucket {
191                self.counts[i].fetch_add(1, Ordering::Relaxed);
192            }
193        }
194    }
195
196    fn export(&self, name: &str, help: &str) -> String {
197        let mut output = String::new();
198        output.push_str(&format!("# HELP {} {}\n", name, help));
199        output.push_str(&format!("# TYPE {} histogram\n", name));
200
201        for (i, &bucket) in self.buckets.iter().enumerate() {
202            let count = self.counts[i].load(Ordering::Relaxed);
203            output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
204        }
205
206        output.push_str(&format!(
207            "{}_bucket{{le=\"+Inf\"}} {}\n",
208            name,
209            self.count.load(Ordering::Relaxed)
210        ));
211        output.push_str(&format!(
212            "{}_sum {}\n",
213            name,
214            self.sum.load(Ordering::Relaxed) as f64 / 1000.0
215        ));
216        output.push_str(&format!(
217            "{}_count {}\n",
218            name,
219            self.count.load(Ordering::Relaxed)
220        ));
221
222        output
223    }
224}
225
226/// Timer for tracking operation duration
227pub struct Timer {
228    start: Instant,
229    histogram: Arc<Histogram>,
230}
231
232impl Timer {
233    fn new(histogram: Arc<Histogram>) -> Self {
234        Self {
235            start: Instant::now(),
236            histogram,
237        }
238    }
239
240    pub fn observe(self) {
241        let duration = self.start.elapsed().as_secs_f64();
242        self.histogram.observe(duration);
243    }
244}
245
246/// RAII guard that decrements the active-connections gauge on drop.
247///
248/// Construct via [`MetricsCollector::connection_guard`].
249pub struct ConnectionGuard {
250    metrics: MetricsCollector,
251    protocol: String,
252}
253
254impl Drop for ConnectionGuard {
255    fn drop(&mut self) {
256        self.metrics.dec_active_connections(&self.protocol);
257    }
258}
259
260/// Server metrics collector
261#[derive(Clone)]
262pub struct MetricsCollector {
263    // SMTP metrics
264    smtp_connections_total: Arc<AtomicU64>,
265    smtp_messages_received: Arc<AtomicU64>,
266    smtp_messages_sent: Arc<AtomicU64>,
267    smtp_errors: Arc<AtomicU64>,
268    smtp_auth_success_total: Arc<AtomicU64>,
269    smtp_auth_failure_total: Arc<AtomicU64>,
270    smtp_messages_rejected_total: Arc<AtomicU64>,
271    smtp_connections_rejected_blocked: Arc<AtomicU64>,
272    smtp_connections_rejected_overload: Arc<AtomicU64>,
273
274    // IMAP metrics
275    imap_connections_total: Arc<AtomicU64>,
276    imap_commands_total: Arc<AtomicU64>,
277    imap_errors: Arc<AtomicU64>,
278
279    // JMAP metrics
280    jmap_requests_total: Arc<AtomicU64>,
281    jmap_errors: Arc<AtomicU64>,
282
283    // WebPush delivery metrics
284    push_deliveries_total: Arc<AtomicU64>,
285    push_delivery_failures_total: Arc<AtomicU64>,
286
287    // Mail processing metrics
288    mail_processed_total: Arc<AtomicU64>,
289    mail_delivered_total: Arc<AtomicU64>,
290    mail_bounced_total: Arc<AtomicU64>,
291    mail_dropped_total: Arc<AtomicU64>,
292
293    // Queue metrics
294    queue_size: Arc<AtomicU64>,
295    queue_retries: Arc<AtomicU64>,
296
297    // Storage metrics
298    mailboxes_total: Arc<AtomicU64>,
299    messages_total: Arc<AtomicU64>,
300    storage_bytes: Arc<AtomicU64>,
301
302    // Histograms
303    message_processing_latency: Arc<Histogram>,
304    smtp_session_duration: Arc<Histogram>,
305
306    // Active connections gauge (label: protocol -> live count, may go negative on misuse).
307    active_connections: Arc<DashMap<String, Arc<AtomicI64>>>,
308
309    // TLS sessions counter (label: tls -> total sessions seen).
310    tls_sessions_total: Arc<DashMap<String, Arc<AtomicU64>>>,
311
312    // Per-recipient-domain message counter (label: domain -> count).
313    messages_per_domain: Arc<DashMap<String, Arc<AtomicU64>>>,
314
315    // Optional callback for refreshing the per-domain counters on demand.
316    domain_stats_source: Arc<Mutex<Option<DomainStatsSource>>>,
317}
318
319impl std::fmt::Debug for MetricsCollector {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        f.debug_struct("MetricsCollector")
322            .field(
323                "smtp_connections_total",
324                &self.smtp_connections_total.load(Ordering::Relaxed),
325            )
326            .field(
327                "active_connections_protocols",
328                &self.active_connections.len(),
329            )
330            .field("tls_label_count", &self.tls_sessions_total.len())
331            .field("domain_label_count", &self.messages_per_domain.len())
332            .finish()
333    }
334}
335
336impl Default for MetricsCollector {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342impl MetricsCollector {
343    /// Create a new metrics collector
344    pub fn new() -> Self {
345        // Define histogram buckets for latency (in seconds)
346        let latency_buckets = vec![
347            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
348        ];
349        // Define histogram buckets for session duration (in seconds)
350        let duration_buckets = vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0];
351
352        Self {
353            smtp_connections_total: Arc::new(AtomicU64::new(0)),
354            smtp_messages_received: Arc::new(AtomicU64::new(0)),
355            smtp_messages_sent: Arc::new(AtomicU64::new(0)),
356            smtp_errors: Arc::new(AtomicU64::new(0)),
357            smtp_auth_success_total: Arc::new(AtomicU64::new(0)),
358            smtp_auth_failure_total: Arc::new(AtomicU64::new(0)),
359            smtp_messages_rejected_total: Arc::new(AtomicU64::new(0)),
360            smtp_connections_rejected_blocked: Arc::new(AtomicU64::new(0)),
361            smtp_connections_rejected_overload: Arc::new(AtomicU64::new(0)),
362            imap_connections_total: Arc::new(AtomicU64::new(0)),
363            imap_commands_total: Arc::new(AtomicU64::new(0)),
364            imap_errors: Arc::new(AtomicU64::new(0)),
365            jmap_requests_total: Arc::new(AtomicU64::new(0)),
366            jmap_errors: Arc::new(AtomicU64::new(0)),
367            push_deliveries_total: Arc::new(AtomicU64::new(0)),
368            push_delivery_failures_total: Arc::new(AtomicU64::new(0)),
369            mail_processed_total: Arc::new(AtomicU64::new(0)),
370            mail_delivered_total: Arc::new(AtomicU64::new(0)),
371            mail_bounced_total: Arc::new(AtomicU64::new(0)),
372            mail_dropped_total: Arc::new(AtomicU64::new(0)),
373            queue_size: Arc::new(AtomicU64::new(0)),
374            queue_retries: Arc::new(AtomicU64::new(0)),
375            mailboxes_total: Arc::new(AtomicU64::new(0)),
376            messages_total: Arc::new(AtomicU64::new(0)),
377            storage_bytes: Arc::new(AtomicU64::new(0)),
378            message_processing_latency: Arc::new(Histogram::new(latency_buckets)),
379            smtp_session_duration: Arc::new(Histogram::new(duration_buckets)),
380            active_connections: Arc::new(DashMap::new()),
381            tls_sessions_total: Arc::new(DashMap::new()),
382            messages_per_domain: Arc::new(DashMap::new()),
383            domain_stats_source: Arc::new(Mutex::new(None)),
384        }
385    }
386
387    /// Record mail completion (compatibility method)
388    pub fn record_mail_completed(&self, _mail: &Mail) {
389        self.inc_mail_processed();
390        self.inc_mail_delivered();
391    }
392
393    // SMTP metrics
394    pub fn inc_smtp_connections(&self) {
395        self.smtp_connections_total.fetch_add(1, Ordering::Relaxed);
396    }
397
398    pub fn inc_smtp_messages_received(&self) {
399        self.smtp_messages_received.fetch_add(1, Ordering::Relaxed);
400    }
401
402    pub fn inc_smtp_messages_sent(&self) {
403        self.smtp_messages_sent.fetch_add(1, Ordering::Relaxed);
404    }
405
406    pub fn inc_smtp_errors(&self) {
407        self.smtp_errors.fetch_add(1, Ordering::Relaxed);
408    }
409
410    /// Increment the SMTP authentication success counter (any mechanism).
411    pub fn inc_smtp_auth_success(&self) {
412        self.smtp_auth_success_total.fetch_add(1, Ordering::Relaxed);
413    }
414
415    /// Increment the SMTP authentication failure counter (wrong credentials or mechanism error).
416    pub fn inc_smtp_auth_failure(&self) {
417        self.smtp_auth_failure_total.fetch_add(1, Ordering::Relaxed);
418    }
419
420    /// Increment the SMTP rejected-message counter (4xx/5xx after DATA, e.g., size exceeded).
421    pub fn inc_smtp_messages_rejected(&self) {
422        self.smtp_messages_rejected_total
423            .fetch_add(1, Ordering::Relaxed);
424    }
425
426    /// Read the SMTP auth success counter (primarily for testing).
427    pub fn smtp_auth_success_count(&self) -> u64 {
428        self.smtp_auth_success_total.load(Ordering::Relaxed)
429    }
430
431    /// Read the SMTP auth failure counter (primarily for testing).
432    pub fn smtp_auth_failure_count(&self) -> u64 {
433        self.smtp_auth_failure_total.load(Ordering::Relaxed)
434    }
435
436    /// Read the SMTP rejected-message counter (primarily for testing).
437    pub fn smtp_messages_rejected_count(&self) -> u64 {
438        self.smtp_messages_rejected_total.load(Ordering::Relaxed)
439    }
440
441    /// Increment the SMTP connections-rejected-blocked-IP counter.
442    pub fn inc_smtp_connections_rejected_blocked(&self) {
443        self.smtp_connections_rejected_blocked
444            .fetch_add(1, Ordering::Relaxed);
445    }
446
447    /// Read the SMTP connections-rejected-blocked counter (primarily for testing).
448    pub fn smtp_connections_rejected_blocked_count(&self) -> u64 {
449        self.smtp_connections_rejected_blocked
450            .load(Ordering::Relaxed)
451    }
452
453    /// Increment the SMTP connections-rejected-overload counter (concurrent-connection cap exceeded).
454    pub fn inc_smtp_connections_rejected_overload(&self) {
455        self.smtp_connections_rejected_overload
456            .fetch_add(1, Ordering::Relaxed);
457    }
458
459    /// Read the SMTP connections-rejected-overload counter (primarily for testing).
460    pub fn smtp_connections_rejected_overload_count(&self) -> u64 {
461        self.smtp_connections_rejected_overload
462            .load(Ordering::Relaxed)
463    }
464
465    /// Read the SMTP accepted-message counter (primarily for testing).
466    pub fn smtp_messages_accepted_count(&self) -> u64 {
467        self.smtp_messages_received.load(Ordering::Relaxed)
468    }
469
470    /// Read the SMTP connections counter (primarily for testing).
471    pub fn smtp_connections_count(&self) -> u64 {
472        self.smtp_connections_total.load(Ordering::Relaxed)
473    }
474
475    // IMAP metrics
476    pub fn inc_imap_connections(&self) {
477        self.imap_connections_total.fetch_add(1, Ordering::Relaxed);
478    }
479
480    pub fn inc_imap_commands(&self) {
481        self.imap_commands_total.fetch_add(1, Ordering::Relaxed);
482    }
483
484    pub fn inc_imap_errors(&self) {
485        self.imap_errors.fetch_add(1, Ordering::Relaxed);
486    }
487
488    // JMAP metrics
489    pub fn inc_jmap_requests(&self) {
490        self.jmap_requests_total.fetch_add(1, Ordering::Relaxed);
491    }
492
493    pub fn inc_jmap_errors(&self) {
494        self.jmap_errors.fetch_add(1, Ordering::Relaxed);
495    }
496
497    /// Increment the WebPush successful-delivery counter.
498    pub fn inc_push_deliveries(&self) {
499        self.push_deliveries_total.fetch_add(1, Ordering::Relaxed);
500    }
501
502    /// Increment the WebPush final-failure counter (all retries exhausted or 410 Gone).
503    pub fn inc_push_delivery_failures(&self) {
504        self.push_delivery_failures_total
505            .fetch_add(1, Ordering::Relaxed);
506    }
507
508    /// Read the push-deliveries counter (primarily for testing).
509    pub fn push_deliveries_count(&self) -> u64 {
510        self.push_deliveries_total.load(Ordering::Relaxed)
511    }
512
513    /// Read the push-delivery-failures counter (primarily for testing).
514    pub fn push_delivery_failures_count(&self) -> u64 {
515        self.push_delivery_failures_total.load(Ordering::Relaxed)
516    }
517
518    // Mail processing metrics
519    pub fn inc_mail_processed(&self) {
520        self.mail_processed_total.fetch_add(1, Ordering::Relaxed);
521    }
522
523    pub fn inc_mail_delivered(&self) {
524        self.mail_delivered_total.fetch_add(1, Ordering::Relaxed);
525    }
526
527    pub fn inc_mail_bounced(&self) {
528        self.mail_bounced_total.fetch_add(1, Ordering::Relaxed);
529    }
530
531    pub fn inc_mail_dropped(&self) {
532        self.mail_dropped_total.fetch_add(1, Ordering::Relaxed);
533    }
534
535    // Queue metrics
536    pub fn set_queue_size(&self, size: u64) {
537        self.queue_size.store(size, Ordering::Relaxed);
538    }
539
540    pub fn inc_queue_retries(&self) {
541        self.queue_retries.fetch_add(1, Ordering::Relaxed);
542    }
543
544    // Storage metrics
545    pub fn set_mailboxes_total(&self, count: u64) {
546        self.mailboxes_total.store(count, Ordering::Relaxed);
547    }
548
549    pub fn set_messages_total(&self, count: u64) {
550        self.messages_total.store(count, Ordering::Relaxed);
551    }
552
553    pub fn set_storage_bytes(&self, bytes: u64) {
554        self.storage_bytes.store(bytes, Ordering::Relaxed);
555    }
556
557    // Histogram metrics
558    pub fn start_message_processing_timer(&self) -> Timer {
559        Timer::new(Arc::clone(&self.message_processing_latency))
560    }
561
562    pub fn start_smtp_session_timer(&self) -> Timer {
563        Timer::new(Arc::clone(&self.smtp_session_duration))
564    }
565
566    // ----- Active connections gauge (rusmes_active_connections{protocol=...}) -----
567
568    /// Increment the active-connections gauge for a protocol (`smtp`, `imap`, `jmap`, `pop3`).
569    pub fn inc_active_connections(&self, protocol: &str) {
570        let entry = self
571            .active_connections
572            .entry(protocol.to_owned())
573            .or_insert_with(|| Arc::new(AtomicI64::new(0)));
574        entry.fetch_add(1, Ordering::Relaxed);
575    }
576
577    /// Decrement the active-connections gauge for a protocol.
578    ///
579    /// Safe to call even if the protocol label has never been seen — it will lazily create
580    /// the entry at zero (and immediately drop to -1, which is a useful diagnostic signal
581    /// rather than a panic).
582    pub fn dec_active_connections(&self, protocol: &str) {
583        let entry = self
584            .active_connections
585            .entry(protocol.to_owned())
586            .or_insert_with(|| Arc::new(AtomicI64::new(0)));
587        entry.fetch_sub(1, Ordering::Relaxed);
588    }
589
590    /// Read the current active-connections gauge for a protocol (0 if never observed).
591    pub fn active_connections(&self, protocol: &str) -> i64 {
592        self.active_connections
593            .get(protocol)
594            .map(|v| v.load(Ordering::Relaxed))
595            .unwrap_or(0)
596    }
597
598    /// Return an RAII guard that increments the gauge on creation and decrements on drop.
599    ///
600    /// Use this from each protocol's connection-acceptance path so the gauge round-trips
601    /// even when the session terminates via `?`, panic, or an early return:
602    ///
603    /// ```rust,ignore
604    /// let _conn_guard = metrics.connection_guard("smtp");
605    /// session.handle().await?;
606    /// // gauge decremented here when guard drops, regardless of outcome
607    /// ```
608    pub fn connection_guard(&self, protocol: &str) -> ConnectionGuard {
609        self.inc_active_connections(protocol);
610        ConnectionGuard {
611            metrics: self.clone(),
612            protocol: protocol.to_owned(),
613        }
614    }
615
616    // ----- TLS counter (rusmes_tls_sessions_total{tls=yes|no|starttls}) -----
617
618    /// Record a session creation under the given TLS label.
619    ///
620    /// `tls_kind` should be one of [`tls_label::YES`], [`tls_label::NO`], or [`tls_label::STARTTLS`].
621    /// Other values are accepted (label cardinality is unbounded only by caller discipline)
622    /// but will produce non-standard label values.
623    pub fn inc_tls_session(&self, tls_kind: &str) {
624        let entry = self
625            .tls_sessions_total
626            .entry(tls_kind.to_owned())
627            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
628        entry.fetch_add(1, Ordering::Relaxed);
629    }
630
631    /// Read the current TLS-sessions counter for a given label.
632    pub fn tls_session_count(&self, tls_kind: &str) -> u64 {
633        self.tls_sessions_total
634            .get(tls_kind)
635            .map(|v| v.load(Ordering::Relaxed))
636            .unwrap_or(0)
637    }
638
639    // ----- Per-domain message counters (rusmes_messages_per_domain_total{domain=...}) -----
640
641    /// Set the absolute count for a recipient domain.
642    ///
643    /// Prefer this when feeding from `rusmes_core::MailQueue::queue_stats_per_domain()` —
644    /// the queue owns the canonical counter and the metrics layer just mirrors the value.
645    pub fn set_messages_per_domain(&self, domain: &str, count: u64) {
646        let entry = self
647            .messages_per_domain
648            .entry(domain.to_owned())
649            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
650        entry.store(count, Ordering::Relaxed);
651    }
652
653    /// Increment the per-domain counter by one. Useful when the metrics layer is the
654    /// counter of record (i.e. when there is no upstream queue snapshot).
655    pub fn inc_messages_per_domain(&self, domain: &str) {
656        let entry = self
657            .messages_per_domain
658            .entry(domain.to_owned())
659            .or_insert_with(|| Arc::new(AtomicU64::new(0)));
660        entry.fetch_add(1, Ordering::Relaxed);
661    }
662
663    /// Read the per-domain counter snapshot.
664    pub fn messages_per_domain(&self) -> HashMap<String, u64> {
665        self.messages_per_domain
666            .iter()
667            .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
668            .collect()
669    }
670
671    /// Register a fresh-reading callback that returns the current per-domain counts.
672    ///
673    /// The callback is invoked at scrape time (every `/metrics` HTTP request) so the
674    /// exposition reflects the queue's current state without the metrics layer having to
675    /// duplicate-track every enqueue. Pair with [`Self::spawn_domain_stats_refresher`]
676    /// for a periodic background snapshot when the source is expensive to query.
677    pub fn set_domain_stats_source(&self, source: DomainStatsSource) {
678        if let Ok(mut guard) = self.domain_stats_source.lock() {
679            *guard = Some(source);
680        }
681    }
682
683    /// Spawn a background task that refreshes the per-domain counters at a fixed cadence.
684    ///
685    /// The task pulls the current snapshot from the registered [`DomainStatsSource`]
686    /// (set via [`Self::set_domain_stats_source`]) and calls
687    /// [`Self::set_messages_per_domain`] for every entry. If no source is configured,
688    /// the task ticks once and exits.
689    pub fn spawn_domain_stats_refresher(&self, period: Duration) -> tokio::task::JoinHandle<()> {
690        let collector = self.clone();
691        tokio::spawn(async move {
692            let mut interval = tokio::time::interval(period);
693            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
694            loop {
695                interval.tick().await;
696                collector.refresh_domain_stats_now();
697            }
698        })
699    }
700
701    /// Refresh the per-domain counters synchronously from the registered source.
702    pub fn refresh_domain_stats_now(&self) {
703        let snapshot = match self.domain_stats_source.lock() {
704            Ok(guard) => match guard.as_ref() {
705                Some(src) => src(),
706                None => return,
707            },
708            Err(_) => return,
709        };
710        for (domain, count) in snapshot {
711            self.set_messages_per_domain(&domain, count);
712        }
713    }
714
715    /// Export metrics in Prometheus text format
716    pub fn export_prometheus(&self) -> String {
717        let mut output = String::new();
718
719        // SMTP metrics
720        output.push_str("# HELP rusmes_smtp_connections_total Total SMTP connections\n");
721        output.push_str("# TYPE rusmes_smtp_connections_total counter\n");
722        output.push_str(&format!(
723            "rusmes_smtp_connections_total {}\n",
724            self.smtp_connections_total.load(Ordering::Relaxed)
725        ));
726
727        output
728            .push_str("# HELP rusmes_smtp_messages_received_total Total SMTP messages received\n");
729        output.push_str("# TYPE rusmes_smtp_messages_received_total counter\n");
730        output.push_str(&format!(
731            "rusmes_smtp_messages_received_total {}\n",
732            self.smtp_messages_received.load(Ordering::Relaxed)
733        ));
734
735        output.push_str("# HELP rusmes_smtp_messages_sent_total Total SMTP messages sent\n");
736        output.push_str("# TYPE rusmes_smtp_messages_sent_total counter\n");
737        output.push_str(&format!(
738            "rusmes_smtp_messages_sent_total {}\n",
739            self.smtp_messages_sent.load(Ordering::Relaxed)
740        ));
741
742        output.push_str("# HELP rusmes_smtp_errors_total Total SMTP errors\n");
743        output.push_str("# TYPE rusmes_smtp_errors_total counter\n");
744        output.push_str(&format!(
745            "rusmes_smtp_errors_total {}\n",
746            self.smtp_errors.load(Ordering::Relaxed)
747        ));
748
749        output.push_str(
750            "# HELP rusmes_smtp_auth_success_total Total successful SMTP AUTH exchanges\n",
751        );
752        output.push_str("# TYPE rusmes_smtp_auth_success_total counter\n");
753        output.push_str(&format!(
754            "rusmes_smtp_auth_success_total {}\n",
755            self.smtp_auth_success_total.load(Ordering::Relaxed)
756        ));
757
758        output.push_str("# HELP rusmes_smtp_auth_failure_total Total failed SMTP AUTH exchanges\n");
759        output.push_str("# TYPE rusmes_smtp_auth_failure_total counter\n");
760        output.push_str(&format!(
761            "rusmes_smtp_auth_failure_total {}\n",
762            self.smtp_auth_failure_total.load(Ordering::Relaxed)
763        ));
764
765        output.push_str("# HELP rusmes_smtp_messages_rejected_total Total SMTP messages rejected due to size limit exceeded during DATA\n");
766        output.push_str("# TYPE rusmes_smtp_messages_rejected_total counter\n");
767        output.push_str(&format!(
768            "rusmes_smtp_messages_rejected_total {}\n",
769            self.smtp_messages_rejected_total.load(Ordering::Relaxed)
770        ));
771
772        output.push_str("# HELP rusmes_smtp_connections_rejected_blocked_total Total SMTP connections rejected due to blocked IP\n");
773        output.push_str("# TYPE rusmes_smtp_connections_rejected_blocked_total counter\n");
774        output.push_str(&format!(
775            "rusmes_smtp_connections_rejected_blocked_total {}\n",
776            self.smtp_connections_rejected_blocked
777                .load(Ordering::Relaxed)
778        ));
779
780        output.push_str("# HELP rusmes_smtp_connections_rejected_overload_total Total SMTP connections rejected due to per-IP connection cap exceeded\n");
781        output.push_str("# TYPE rusmes_smtp_connections_rejected_overload_total counter\n");
782        output.push_str(&format!(
783            "rusmes_smtp_connections_rejected_overload_total {}\n",
784            self.smtp_connections_rejected_overload
785                .load(Ordering::Relaxed)
786        ));
787
788        // IMAP metrics
789        output.push_str("# HELP rusmes_imap_connections_total Total IMAP connections\n");
790        output.push_str("# TYPE rusmes_imap_connections_total counter\n");
791        output.push_str(&format!(
792            "rusmes_imap_connections_total {}\n",
793            self.imap_connections_total.load(Ordering::Relaxed)
794        ));
795
796        output.push_str("# HELP rusmes_imap_commands_total Total IMAP commands\n");
797        output.push_str("# TYPE rusmes_imap_commands_total counter\n");
798        output.push_str(&format!(
799            "rusmes_imap_commands_total {}\n",
800            self.imap_commands_total.load(Ordering::Relaxed)
801        ));
802
803        output.push_str("# HELP rusmes_imap_errors_total Total IMAP errors\n");
804        output.push_str("# TYPE rusmes_imap_errors_total counter\n");
805        output.push_str(&format!(
806            "rusmes_imap_errors_total {}\n",
807            self.imap_errors.load(Ordering::Relaxed)
808        ));
809
810        // JMAP metrics
811        output.push_str("# HELP rusmes_jmap_requests_total Total JMAP requests\n");
812        output.push_str("# TYPE rusmes_jmap_requests_total counter\n");
813        output.push_str(&format!(
814            "rusmes_jmap_requests_total {}\n",
815            self.jmap_requests_total.load(Ordering::Relaxed)
816        ));
817
818        output.push_str("# HELP rusmes_jmap_errors_total Total JMAP errors\n");
819        output.push_str("# TYPE rusmes_jmap_errors_total counter\n");
820        output.push_str(&format!(
821            "rusmes_jmap_errors_total {}\n",
822            self.jmap_errors.load(Ordering::Relaxed)
823        ));
824
825        // WebPush delivery metrics
826        output
827            .push_str("# HELP rusmes_push_deliveries_total Total successful WebPush deliveries\n");
828        output.push_str("# TYPE rusmes_push_deliveries_total counter\n");
829        output.push_str(&format!(
830            "rusmes_push_deliveries_total {}\n",
831            self.push_deliveries_total.load(Ordering::Relaxed)
832        ));
833
834        output.push_str(
835            "# HELP rusmes_push_delivery_failures_total Total WebPush final delivery failures\n",
836        );
837        output.push_str("# TYPE rusmes_push_delivery_failures_total counter\n");
838        output.push_str(&format!(
839            "rusmes_push_delivery_failures_total {}\n",
840            self.push_delivery_failures_total.load(Ordering::Relaxed)
841        ));
842
843        // Mail processing metrics
844        output.push_str("# HELP rusmes_mail_processed_total Total mail processed\n");
845        output.push_str("# TYPE rusmes_mail_processed_total counter\n");
846        output.push_str(&format!(
847            "rusmes_mail_processed_total {}\n",
848            self.mail_processed_total.load(Ordering::Relaxed)
849        ));
850
851        output.push_str("# HELP rusmes_mail_delivered_total Total mail delivered\n");
852        output.push_str("# TYPE rusmes_mail_delivered_total counter\n");
853        output.push_str(&format!(
854            "rusmes_mail_delivered_total {}\n",
855            self.mail_delivered_total.load(Ordering::Relaxed)
856        ));
857
858        output.push_str("# HELP rusmes_mail_bounced_total Total mail bounced\n");
859        output.push_str("# TYPE rusmes_mail_bounced_total counter\n");
860        output.push_str(&format!(
861            "rusmes_mail_bounced_total {}\n",
862            self.mail_bounced_total.load(Ordering::Relaxed)
863        ));
864
865        output.push_str("# HELP rusmes_mail_dropped_total Total mail dropped\n");
866        output.push_str("# TYPE rusmes_mail_dropped_total counter\n");
867        output.push_str(&format!(
868            "rusmes_mail_dropped_total {}\n",
869            self.mail_dropped_total.load(Ordering::Relaxed)
870        ));
871
872        // Queue metrics
873        output.push_str("# HELP rusmes_queue_size Current queue size\n");
874        output.push_str("# TYPE rusmes_queue_size gauge\n");
875        output.push_str(&format!(
876            "rusmes_queue_size {}\n",
877            self.queue_size.load(Ordering::Relaxed)
878        ));
879
880        output.push_str("# HELP rusmes_queue_retries_total Total queue retries\n");
881        output.push_str("# TYPE rusmes_queue_retries_total counter\n");
882        output.push_str(&format!(
883            "rusmes_queue_retries_total {}\n",
884            self.queue_retries.load(Ordering::Relaxed)
885        ));
886
887        // Storage metrics
888        output.push_str("# HELP rusmes_mailboxes_total Total mailboxes\n");
889        output.push_str("# TYPE rusmes_mailboxes_total gauge\n");
890        output.push_str(&format!(
891            "rusmes_mailboxes_total {}\n",
892            self.mailboxes_total.load(Ordering::Relaxed)
893        ));
894
895        output.push_str("# HELP rusmes_messages_total Total messages\n");
896        output.push_str("# TYPE rusmes_messages_total gauge\n");
897        output.push_str(&format!(
898            "rusmes_messages_total {}\n",
899            self.messages_total.load(Ordering::Relaxed)
900        ));
901
902        output.push_str("# HELP rusmes_storage_bytes Total storage bytes\n");
903        output.push_str("# TYPE rusmes_storage_bytes gauge\n");
904        output.push_str(&format!(
905            "rusmes_storage_bytes {}\n",
906            self.storage_bytes.load(Ordering::Relaxed)
907        ));
908
909        // Histogram metrics
910        output.push_str(&self.message_processing_latency.export(
911            "rusmes_message_processing_latency_seconds",
912            "Message processing latency in seconds",
913        ));
914
915        output.push_str(&self.smtp_session_duration.export(
916            "rusmes_smtp_session_duration_seconds",
917            "SMTP session duration in seconds",
918        ));
919
920        // Active connections gauge (label: protocol)
921        output.push_str(
922            "# HELP rusmes_active_connections Currently open client connections per protocol\n",
923        );
924        output.push_str("# TYPE rusmes_active_connections gauge\n");
925        // Sort by label for deterministic output (eases tests + diff-based scrape consumers).
926        let mut active: Vec<(String, i64)> = self
927            .active_connections
928            .iter()
929            .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
930            .collect();
931        active.sort_by(|a, b| a.0.cmp(&b.0));
932        for (protocol, value) in active {
933            output.push_str(&format!(
934                "rusmes_active_connections{{protocol=\"{}\"}} {}\n",
935                escape_label_value(&protocol),
936                value
937            ));
938        }
939
940        // TLS sessions counter (label: tls)
941        output.push_str(
942            "# HELP rusmes_tls_sessions_total Total client sessions seen, partitioned by TLS state\n",
943        );
944        output.push_str("# TYPE rusmes_tls_sessions_total counter\n");
945        let mut tls: Vec<(String, u64)> = self
946            .tls_sessions_total
947            .iter()
948            .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
949            .collect();
950        tls.sort_by(|a, b| a.0.cmp(&b.0));
951        for (label, value) in tls {
952            output.push_str(&format!(
953                "rusmes_tls_sessions_total{{tls=\"{}\"}} {}\n",
954                escape_label_value(&label),
955                value
956            ));
957        }
958
959        // Per-domain message counter (label: domain)
960        // Pull a fresh snapshot from the registered source if any (typically Cluster 4's
961        // MailQueue::queue_stats_per_domain) so the scrape reflects live state.
962        self.refresh_domain_stats_now();
963        output.push_str(
964            "# HELP rusmes_messages_per_domain_total Total messages enqueued per recipient domain\n",
965        );
966        output.push_str("# TYPE rusmes_messages_per_domain_total counter\n");
967        let mut domains: Vec<(String, u64)> = self
968            .messages_per_domain
969            .iter()
970            .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
971            .collect();
972        domains.sort_by(|a, b| a.0.cmp(&b.0));
973        for (domain, value) in domains {
974            output.push_str(&format!(
975                "rusmes_messages_per_domain_total{{domain=\"{}\"}} {}\n",
976                escape_label_value(&domain),
977                value
978            ));
979        }
980
981        output
982    }
983
984    /// Start the HTTP metrics server
985    pub async fn start_http_server(self, config: MetricsConfig) -> anyhow::Result<()> {
986        if !config.enabled {
987            eprintln!("Metrics HTTP server is disabled");
988            return Ok(());
989        }
990
991        config.validate_bind_address()?;
992        config.validate_path()?;
993
994        let bind_address = config.bind_address.clone();
995        let metrics_path = config.path.clone();
996        let basic_auth = config.basic_auth.clone();
997        let app = self.build_router(&metrics_path, basic_auth);
998
999        eprintln!(
1000            "Starting metrics HTTP server on {}{}",
1001            bind_address, metrics_path
1002        );
1003        eprintln!("Health check endpoints: /health, /ready, /live");
1004
1005        let listener = TcpListener::bind(&bind_address).await?;
1006        axum::serve(listener, app).await?;
1007
1008        Ok(())
1009    }
1010
1011    /// Build the axum router used by [`Self::start_http_server`].
1012    ///
1013    /// Exposed so tests (and embedders) can drive the handler in-process via
1014    /// `tower::ServiceExt::oneshot` without binding a TCP socket.
1015    pub fn build_router(
1016        self,
1017        metrics_path: &str,
1018        basic_auth: Option<MetricsBasicAuthConfig>,
1019    ) -> Router {
1020        let metrics = Arc::new(Mutex::new(self));
1021
1022        let metrics_handler = {
1023            let metrics = Arc::clone(&metrics);
1024            move || {
1025                let metrics = Arc::clone(&metrics);
1026                async move {
1027                    let collector = match metrics.lock() {
1028                        Ok(guard) => guard,
1029                        Err(e) => {
1030                            ::tracing::error!("Metrics mutex poisoned: {e}");
1031                            return StatusCode::INTERNAL_SERVER_ERROR.into_response();
1032                        }
1033                    };
1034                    let output = collector.export_prometheus();
1035                    (
1036                        [(header::CONTENT_TYPE, "text/plain; version=0.0.4")],
1037                        output,
1038                    )
1039                        .into_response()
1040                }
1041            }
1042        };
1043
1044        let mut metrics_router = Router::new().route(metrics_path, get(metrics_handler));
1045        if let Some(auth) = basic_auth {
1046            let state = Arc::new(BasicAuthState { config: auth });
1047            metrics_router =
1048                metrics_router.layer(middleware::from_fn_with_state(state, basic_auth_middleware));
1049        }
1050
1051        let health_router = create_health_router();
1052        Router::new().merge(metrics_router).merge(health_router)
1053    }
1054}
1055
1056/// Internal state for the basic-auth middleware (cloneable handle into the config).
1057#[derive(Clone)]
1058struct BasicAuthState {
1059    config: MetricsBasicAuthConfig,
1060}
1061
1062/// Axum middleware that enforces HTTP Basic auth (RFC 7617) against a bcrypt password hash.
1063///
1064/// Returns `401 Unauthorized` with a `WWW-Authenticate: Basic realm="rusmes-metrics"` header
1065/// on missing/malformed/incorrect credentials. On success the request is forwarded unchanged.
1066async fn basic_auth_middleware(
1067    State(state): State<Arc<BasicAuthState>>,
1068    request: Request,
1069    next: Next,
1070) -> Response {
1071    let header_value = request.headers().get(header::AUTHORIZATION);
1072    if !verify_basic_auth(header_value, &state.config) {
1073        let mut response = StatusCode::UNAUTHORIZED.into_response();
1074        let realm = HeaderValue::from_static("Basic realm=\"rusmes-metrics\", charset=\"UTF-8\"");
1075        response
1076            .headers_mut()
1077            .insert(header::WWW_AUTHENTICATE, realm);
1078        // Replace the body so curl/Prometheus operators see something useful.
1079        *response.body_mut() = Body::from("401 Unauthorized: metrics endpoint requires basic auth");
1080        return response;
1081    }
1082    next.run(request).await
1083}
1084
1085/// Verify a `Authorization: Basic <base64>` header against the configured credentials.
1086///
1087/// Returns `true` only when:
1088/// 1. The header is present, well-formed, and base64-decodes,
1089/// 2. The username matches exactly (constant-time comparison via [`bytes_eq_constant_time`]),
1090/// 3. The password verifies against the bcrypt hash.
1091fn verify_basic_auth(header_value: Option<&HeaderValue>, config: &MetricsBasicAuthConfig) -> bool {
1092    let value = match header_value.and_then(|v| v.to_str().ok()) {
1093        Some(v) => v,
1094        None => return false,
1095    };
1096    let encoded = match value.strip_prefix("Basic ") {
1097        Some(s) => s.trim(),
1098        None => return false,
1099    };
1100    let decoded = match BASE64.decode(encoded) {
1101        Ok(bytes) => bytes,
1102        Err(_) => return false,
1103    };
1104    let credentials = match std::str::from_utf8(&decoded) {
1105        Ok(s) => s,
1106        Err(_) => return false,
1107    };
1108    let (user, password) = match credentials.split_once(':') {
1109        Some(parts) => parts,
1110        None => return false,
1111    };
1112    if !bytes_eq_constant_time(user.as_bytes(), config.username.as_bytes()) {
1113        return false;
1114    }
1115    match bcrypt::verify(password, &config.password_hash) {
1116        Ok(ok) => ok,
1117        Err(e) => {
1118            ::tracing::warn!(
1119                "bcrypt verify failed for metrics basic auth (likely malformed hash in config): {e}"
1120            );
1121            false
1122        }
1123    }
1124}
1125
1126/// Constant-time byte equality check (avoids leaking the username length via early-exit timing).
1127fn bytes_eq_constant_time(a: &[u8], b: &[u8]) -> bool {
1128    if a.len() != b.len() {
1129        return false;
1130    }
1131    let mut diff: u8 = 0;
1132    for (x, y) in a.iter().zip(b.iter()) {
1133        diff |= x ^ y;
1134    }
1135    diff == 0
1136}
1137
1138/// Escape a Prometheus label value per the exposition format spec.
1139///
1140/// Per <https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details>,
1141/// label values are surrounded by double quotes; backslash, double-quote, and newline
1142/// must be escaped.
1143fn escape_label_value(value: &str) -> String {
1144    let mut out = String::with_capacity(value.len());
1145    for ch in value.chars() {
1146        match ch {
1147            '\\' => out.push_str("\\\\"),
1148            '"' => out.push_str("\\\""),
1149            '\n' => out.push_str("\\n"),
1150            other => out.push(other),
1151        }
1152    }
1153    out
1154}
1155
1156/// Health check response
1157#[derive(Debug, Serialize, Clone)]
1158pub struct HealthResponse {
1159    pub status: String,
1160    pub checks: HealthChecks,
1161}
1162
1163/// Individual health checks
1164#[derive(Debug, Serialize, Clone)]
1165pub struct HealthChecks {
1166    pub storage: String,
1167    pub queue: String,
1168}
1169
1170/// Readiness probe response
1171#[derive(Debug, Serialize, Clone)]
1172pub struct ReadyResponse {
1173    pub ready: bool,
1174}
1175
1176/// Liveness probe response
1177#[derive(Debug, Serialize, Clone)]
1178pub struct LiveResponse {
1179    pub alive: bool,
1180}
1181
1182/// Health check handler
1183async fn health_check() -> (StatusCode, Json<HealthResponse>) {
1184    let storage_status = check_storage().await;
1185    let queue_status = check_queue().await;
1186
1187    let all_healthy = storage_status == "healthy" && queue_status == "healthy";
1188    let status_code = if all_healthy {
1189        StatusCode::OK
1190    } else {
1191        StatusCode::SERVICE_UNAVAILABLE
1192    };
1193
1194    let response = HealthResponse {
1195        status: if all_healthy {
1196            "healthy".to_string()
1197        } else {
1198            "unhealthy".to_string()
1199        },
1200        checks: HealthChecks {
1201            storage: storage_status,
1202            queue: queue_status,
1203        },
1204    };
1205
1206    (status_code, Json(response))
1207}
1208
1209/// Readiness probe handler
1210async fn readiness_check() -> (StatusCode, Json<ReadyResponse>) {
1211    let ready = true;
1212
1213    let status_code = if ready {
1214        StatusCode::OK
1215    } else {
1216        StatusCode::SERVICE_UNAVAILABLE
1217    };
1218
1219    (status_code, Json(ReadyResponse { ready }))
1220}
1221
1222/// Liveness probe handler
1223async fn liveness_check() -> (StatusCode, Json<LiveResponse>) {
1224    (StatusCode::OK, Json(LiveResponse { alive: true }))
1225}
1226
1227/// Check storage backend health
1228async fn check_storage() -> String {
1229    "healthy".to_string()
1230}
1231
1232/// Check queue health
1233async fn check_queue() -> String {
1234    "healthy".to_string()
1235}
1236
1237/// Create health check router
1238pub fn create_health_router() -> Router {
1239    Router::new()
1240        .route("/health", get(health_check))
1241        .route("/ready", get(readiness_check))
1242        .route("/live", get(liveness_check))
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248    use axum::body::Body;
1249    use axum::http::Request as HttpRequest;
1250    use http_body_util::BodyExt;
1251    use tower::ServiceExt;
1252
1253    #[test]
1254    fn test_metrics_collector() {
1255        let metrics = MetricsCollector::new();
1256
1257        metrics.inc_smtp_connections();
1258        metrics.inc_smtp_messages_received();
1259        metrics.inc_mail_processed();
1260        metrics.inc_mail_delivered();
1261
1262        assert_eq!(metrics.smtp_connections_total.load(Ordering::Relaxed), 1);
1263        assert_eq!(metrics.smtp_messages_received.load(Ordering::Relaxed), 1);
1264        assert_eq!(metrics.mail_processed_total.load(Ordering::Relaxed), 1);
1265        assert_eq!(metrics.mail_delivered_total.load(Ordering::Relaxed), 1);
1266    }
1267
1268    #[test]
1269    fn test_prometheus_export() {
1270        let metrics = MetricsCollector::new();
1271        metrics.inc_smtp_connections();
1272        metrics.set_queue_size(42);
1273
1274        let output = metrics.export_prometheus();
1275
1276        assert!(output.contains("rusmes_smtp_connections_total 1"));
1277        assert!(output.contains("rusmes_queue_size 42"));
1278        assert!(output.contains("# HELP"));
1279        assert!(output.contains("# TYPE"));
1280    }
1281
1282    /// Active-connections gauge round-trips through `connection_guard`'s RAII Drop.
1283    ///
1284    /// Cluster 7B: every protocol session opens via `inc()` and closes via `dec()` —
1285    /// when both run, the gauge must return to its baseline.
1286    #[test]
1287    fn test_active_connections_guard_roundtrip() {
1288        let metrics = MetricsCollector::new();
1289        assert_eq!(metrics.active_connections("smtp"), 0);
1290
1291        {
1292            let _g = metrics.connection_guard("smtp");
1293            assert_eq!(metrics.active_connections("smtp"), 1);
1294            {
1295                let _g2 = metrics.connection_guard("smtp");
1296                assert_eq!(metrics.active_connections("smtp"), 2);
1297            }
1298            assert_eq!(metrics.active_connections("smtp"), 1);
1299        }
1300        assert_eq!(metrics.active_connections("smtp"), 0);
1301
1302        // Per-protocol isolation: incrementing imap doesn't affect smtp.
1303        let _g = metrics.connection_guard("imap");
1304        assert_eq!(metrics.active_connections("imap"), 1);
1305        assert_eq!(metrics.active_connections("smtp"), 0);
1306    }
1307
1308    /// TLS counter labels (`yes`, `no`, `starttls`) are reported under the `tls=` label
1309    /// and accumulated across calls.
1310    #[test]
1311    fn test_tls_session_counter_labels() {
1312        let metrics = MetricsCollector::new();
1313        metrics.inc_tls_session(tls_label::NO);
1314        metrics.inc_tls_session(tls_label::NO);
1315        metrics.inc_tls_session(tls_label::STARTTLS);
1316        metrics.inc_tls_session(tls_label::YES);
1317
1318        assert_eq!(metrics.tls_session_count(tls_label::NO), 2);
1319        assert_eq!(metrics.tls_session_count(tls_label::STARTTLS), 1);
1320        assert_eq!(metrics.tls_session_count(tls_label::YES), 1);
1321
1322        let exp = metrics.export_prometheus();
1323        assert!(exp.contains("rusmes_tls_sessions_total{tls=\"no\"} 2"));
1324        assert!(exp.contains("rusmes_tls_sessions_total{tls=\"starttls\"} 1"));
1325        assert!(exp.contains("rusmes_tls_sessions_total{tls=\"yes\"} 1"));
1326        assert!(exp.contains("# TYPE rusmes_tls_sessions_total counter"));
1327    }
1328
1329    /// Per-domain counter is fed by the registered `DomainStatsSource` callback and
1330    /// surfaces under the `domain=` label in the Prometheus exposition.
1331    ///
1332    /// Cluster 7D: data flows from `rusmes_core::MailQueue::queue_stats_per_domain()`
1333    /// (or any other source) via the callback; the metrics layer just mirrors values
1334    /// at scrape time.
1335    #[test]
1336    fn test_messages_per_domain_from_callback_source() {
1337        let metrics = MetricsCollector::new();
1338        metrics.set_domain_stats_source(Arc::new(|| {
1339            let mut m = HashMap::new();
1340            m.insert("example.com".to_string(), 5u64);
1341            m.insert("example.org".to_string(), 3u64);
1342            m
1343        }));
1344
1345        let exp = metrics.export_prometheus();
1346        assert!(
1347            exp.contains("rusmes_messages_per_domain_total{domain=\"example.com\"} 5"),
1348            "exposition was:\n{exp}"
1349        );
1350        assert!(exp.contains("rusmes_messages_per_domain_total{domain=\"example.org\"} 3"));
1351        assert!(exp.contains("# TYPE rusmes_messages_per_domain_total counter"));
1352    }
1353
1354    /// Label-value escaping: backslash, double-quote, and newline must be escaped per
1355    /// the Prometheus exposition spec so that `domain="weird\"value"` round-trips.
1356    #[test]
1357    fn test_escape_label_value_quotes_and_backslash() {
1358        assert_eq!(escape_label_value("plain"), "plain");
1359        assert_eq!(escape_label_value("a\"b"), "a\\\"b");
1360        assert_eq!(escape_label_value("a\\b"), "a\\\\b");
1361        assert_eq!(escape_label_value("a\nb"), "a\\nb");
1362    }
1363
1364    /// Constant-time username comparison rejects mismatched lengths and contents.
1365    #[test]
1366    fn test_constant_time_eq() {
1367        assert!(bytes_eq_constant_time(b"abc", b"abc"));
1368        assert!(!bytes_eq_constant_time(b"abc", b"abd"));
1369        assert!(!bytes_eq_constant_time(b"abc", b"abcd"));
1370        assert!(bytes_eq_constant_time(b"", b""));
1371    }
1372
1373    /// Build a router with optional basic-auth state and the metrics endpoint at `/metrics`.
1374    fn router_with_basic_auth(creds: Option<(&str, &str)>) -> Router {
1375        let metrics = MetricsCollector::new();
1376        let auth = creds.map(|(u, p)| MetricsBasicAuthConfig {
1377            username: u.to_string(),
1378            // Use a low cost so the bcrypt hash is fast for tests.
1379            password_hash: bcrypt::hash(p, 4).expect("bcrypt hash for test"),
1380        });
1381        metrics.build_router("/metrics", auth)
1382    }
1383
1384    /// Basic auth: 200 with correct creds, 401 without.
1385    ///
1386    /// Cluster 7A.
1387    #[tokio::test]
1388    async fn test_metrics_basic_auth_accepts_correct_credentials() {
1389        let app = router_with_basic_auth(Some(("scrape", "s3cret")));
1390
1391        // No credentials → 401.
1392        let resp = app
1393            .clone()
1394            .oneshot(
1395                HttpRequest::builder()
1396                    .uri("/metrics")
1397                    .body(Body::empty())
1398                    .expect("request build"),
1399            )
1400            .await
1401            .expect("router call");
1402        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1403        let www_auth = resp
1404            .headers()
1405            .get(header::WWW_AUTHENTICATE)
1406            .expect("WWW-Authenticate header on 401")
1407            .to_str()
1408            .expect("ascii header");
1409        assert!(www_auth.starts_with("Basic realm="), "got: {www_auth}");
1410
1411        // Correct credentials → 200 with prometheus body.
1412        let creds = BASE64.encode(b"scrape:s3cret");
1413        let resp = app
1414            .clone()
1415            .oneshot(
1416                HttpRequest::builder()
1417                    .uri("/metrics")
1418                    .header(header::AUTHORIZATION, format!("Basic {creds}"))
1419                    .body(Body::empty())
1420                    .expect("request build"),
1421            )
1422            .await
1423            .expect("router call");
1424        assert_eq!(resp.status(), StatusCode::OK);
1425        let body_bytes = resp
1426            .into_body()
1427            .collect()
1428            .await
1429            .expect("collect")
1430            .to_bytes();
1431        let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
1432        assert!(body_text.contains("# HELP"), "body was:\n{body_text}");
1433    }
1434
1435    /// Basic auth: wrong username and wrong password both return 401 (no info leak).
1436    #[tokio::test]
1437    async fn test_metrics_basic_auth_rejects_wrong_credentials() {
1438        let app = router_with_basic_auth(Some(("scrape", "s3cret")));
1439
1440        let bad_user = BASE64.encode(b"wrong:s3cret");
1441        let resp = app
1442            .clone()
1443            .oneshot(
1444                HttpRequest::builder()
1445                    .uri("/metrics")
1446                    .header(header::AUTHORIZATION, format!("Basic {bad_user}"))
1447                    .body(Body::empty())
1448                    .expect("request build"),
1449            )
1450            .await
1451            .expect("router call");
1452        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1453
1454        let bad_pass = BASE64.encode(b"scrape:wrong");
1455        let resp = app
1456            .oneshot(
1457                HttpRequest::builder()
1458                    .uri("/metrics")
1459                    .header(header::AUTHORIZATION, format!("Basic {bad_pass}"))
1460                    .body(Body::empty())
1461                    .expect("request build"),
1462            )
1463            .await
1464            .expect("router call");
1465        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1466    }
1467
1468    /// Without a `basic_auth` config block, `/metrics` is served unauthenticated (the
1469    /// previous default — backwards-compatible).
1470    #[tokio::test]
1471    async fn test_metrics_no_basic_auth_serves_anonymously() {
1472        let app = router_with_basic_auth(None);
1473        let resp = app
1474            .oneshot(
1475                HttpRequest::builder()
1476                    .uri("/metrics")
1477                    .body(Body::empty())
1478                    .expect("request build"),
1479            )
1480            .await
1481            .expect("router call");
1482        assert_eq!(resp.status(), StatusCode::OK);
1483    }
1484
1485    /// Guard increments the gauge on creation and decrements it on `Drop`.
1486    #[test]
1487    fn test_connection_guard_increments_and_decrements() {
1488        let metrics = MetricsCollector::new();
1489        assert_eq!(metrics.active_connections("pop3"), 0);
1490        let guard = metrics.connection_guard("pop3");
1491        assert_eq!(metrics.active_connections("pop3"), 1);
1492        drop(guard);
1493        assert_eq!(metrics.active_connections("pop3"), 0);
1494    }
1495
1496    /// Active-connections gauges are tracked independently per protocol.
1497    #[test]
1498    fn test_connection_metrics_total() {
1499        let metrics = MetricsCollector::new();
1500        let _g1 = metrics.connection_guard("smtp");
1501        let _g2 = metrics.connection_guard("imap");
1502        let _g3 = metrics.connection_guard("pop3");
1503        assert_eq!(metrics.active_connections("smtp"), 1);
1504        assert_eq!(metrics.active_connections("imap"), 1);
1505        assert_eq!(metrics.active_connections("pop3"), 1);
1506    }
1507
1508    /// Prometheus exposition contains active_connections gauge with correct label and value.
1509    #[test]
1510    fn test_prometheus_format() {
1511        let metrics = MetricsCollector::new();
1512        let _g = metrics.connection_guard("smtp");
1513        let output = metrics.export_prometheus();
1514        assert!(
1515            output.contains("rusmes_active_connections{protocol=\"smtp\"} 1"),
1516            "prometheus output was:\n{output}"
1517        );
1518        assert!(output.contains("# TYPE rusmes_active_connections gauge"));
1519    }
1520
1521    /// Metrics router responds HTTP 200 with a Prometheus body when no auth is configured.
1522    #[tokio::test]
1523    async fn test_metrics_server_responds() {
1524        let metrics = MetricsCollector::new();
1525        let _g = metrics.connection_guard("smtp");
1526        let app = metrics.build_router("/metrics", None);
1527
1528        let resp = app
1529            .oneshot(
1530                HttpRequest::builder()
1531                    .uri("/metrics")
1532                    .body(Body::empty())
1533                    .expect("request build"),
1534            )
1535            .await
1536            .expect("router call");
1537        assert_eq!(resp.status(), StatusCode::OK);
1538        let body_bytes = resp
1539            .into_body()
1540            .collect()
1541            .await
1542            .expect("collect")
1543            .to_bytes();
1544        let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
1545        assert!(
1546            body_text.contains("rusmes_active_connections"),
1547            "body was:\n{body_text}"
1548        );
1549    }
1550
1551    /// Verify that the global metrics handle is the same on every call (singleton).
1552    #[test]
1553    fn test_global_metrics_singleton() {
1554        let a = global_metrics() as *const _;
1555        let b = global_metrics() as *const _;
1556        assert_eq!(a, b, "global_metrics() must return the same instance");
1557    }
1558}