1pub mod tracing;
83
84use axum::{
85 body::Body,
86 extract::{Request, State},
87 http::{header, HeaderValue, StatusCode},
88 middleware::{self, Next},
89 response::{IntoResponse, Response},
90 routing::get,
91 Json, Router,
92};
93use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
94use dashmap::DashMap;
95use rusmes_config::{MetricsBasicAuthConfig, MetricsConfig};
96use rusmes_proto::Mail;
97use serde::Serialize;
98use std::collections::HashMap;
99use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
100use std::sync::{Arc, Mutex, OnceLock};
101use std::time::{Duration, Instant};
102use tokio::net::TcpListener;
103
104static GLOBAL_METRICS: OnceLock<MetricsCollector> = OnceLock::new();
111
112#[derive(Debug, thiserror::Error)]
118#[error("global MetricsCollector has already been initialised")]
119pub struct GlobalMetricsAlreadySet;
120
121pub fn set_global_metrics(collector: MetricsCollector) -> Result<(), GlobalMetricsAlreadySet> {
128 GLOBAL_METRICS
129 .set(collector)
130 .map_err(|_| GlobalMetricsAlreadySet)
131}
132
133pub fn global_metrics() -> &'static MetricsCollector {
138 GLOBAL_METRICS.get_or_init(MetricsCollector::new)
139}
140
141pub mod tls_label {
146 pub const YES: &str = "yes";
148 pub const NO: &str = "no";
150 pub const STARTTLS: &str = "starttls";
152}
153
154pub type DomainStatsSource = Arc<dyn Fn() -> HashMap<String, u64> + Send + Sync>;
160
161#[derive(Debug, Clone)]
163struct Histogram {
164 buckets: Vec<f64>,
165 counts: Vec<Arc<AtomicU64>>,
166 sum: Arc<AtomicU64>,
167 count: Arc<AtomicU64>,
168}
169
170impl Histogram {
171 fn new(buckets: Vec<f64>) -> Self {
172 let counts = buckets
173 .iter()
174 .map(|_| Arc::new(AtomicU64::new(0)))
175 .collect();
176 Self {
177 buckets,
178 counts,
179 sum: Arc::new(AtomicU64::new(0)),
180 count: Arc::new(AtomicU64::new(0)),
181 }
182 }
183
184 fn observe(&self, value: f64) {
185 let millis = (value * 1000.0) as u64;
186 self.sum.fetch_add(millis, Ordering::Relaxed);
187 self.count.fetch_add(1, Ordering::Relaxed);
188
189 for (i, &bucket) in self.buckets.iter().enumerate() {
190 if value <= bucket {
191 self.counts[i].fetch_add(1, Ordering::Relaxed);
192 }
193 }
194 }
195
196 fn export(&self, name: &str, help: &str) -> String {
197 let mut output = String::new();
198 output.push_str(&format!("# HELP {} {}\n", name, help));
199 output.push_str(&format!("# TYPE {} histogram\n", name));
200
201 for (i, &bucket) in self.buckets.iter().enumerate() {
202 let count = self.counts[i].load(Ordering::Relaxed);
203 output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
204 }
205
206 output.push_str(&format!(
207 "{}_bucket{{le=\"+Inf\"}} {}\n",
208 name,
209 self.count.load(Ordering::Relaxed)
210 ));
211 output.push_str(&format!(
212 "{}_sum {}\n",
213 name,
214 self.sum.load(Ordering::Relaxed) as f64 / 1000.0
215 ));
216 output.push_str(&format!(
217 "{}_count {}\n",
218 name,
219 self.count.load(Ordering::Relaxed)
220 ));
221
222 output
223 }
224}
225
226pub struct Timer {
228 start: Instant,
229 histogram: Arc<Histogram>,
230}
231
232impl Timer {
233 fn new(histogram: Arc<Histogram>) -> Self {
234 Self {
235 start: Instant::now(),
236 histogram,
237 }
238 }
239
240 pub fn observe(self) {
241 let duration = self.start.elapsed().as_secs_f64();
242 self.histogram.observe(duration);
243 }
244}
245
246pub struct ConnectionGuard {
250 metrics: MetricsCollector,
251 protocol: String,
252}
253
254impl Drop for ConnectionGuard {
255 fn drop(&mut self) {
256 self.metrics.dec_active_connections(&self.protocol);
257 }
258}
259
260#[derive(Clone)]
262pub struct MetricsCollector {
263 smtp_connections_total: Arc<AtomicU64>,
265 smtp_messages_received: Arc<AtomicU64>,
266 smtp_messages_sent: Arc<AtomicU64>,
267 smtp_errors: Arc<AtomicU64>,
268 smtp_auth_success_total: Arc<AtomicU64>,
269 smtp_auth_failure_total: Arc<AtomicU64>,
270 smtp_messages_rejected_total: Arc<AtomicU64>,
271 smtp_connections_rejected_blocked: Arc<AtomicU64>,
272 smtp_connections_rejected_overload: Arc<AtomicU64>,
273
274 imap_connections_total: Arc<AtomicU64>,
276 imap_commands_total: Arc<AtomicU64>,
277 imap_errors: Arc<AtomicU64>,
278
279 jmap_requests_total: Arc<AtomicU64>,
281 jmap_errors: Arc<AtomicU64>,
282
283 push_deliveries_total: Arc<AtomicU64>,
285 push_delivery_failures_total: Arc<AtomicU64>,
286
287 mail_processed_total: Arc<AtomicU64>,
289 mail_delivered_total: Arc<AtomicU64>,
290 mail_bounced_total: Arc<AtomicU64>,
291 mail_dropped_total: Arc<AtomicU64>,
292
293 queue_size: Arc<AtomicU64>,
295 queue_retries: Arc<AtomicU64>,
296
297 mailboxes_total: Arc<AtomicU64>,
299 messages_total: Arc<AtomicU64>,
300 storage_bytes: Arc<AtomicU64>,
301
302 message_processing_latency: Arc<Histogram>,
304 smtp_session_duration: Arc<Histogram>,
305
306 active_connections: Arc<DashMap<String, Arc<AtomicI64>>>,
308
309 tls_sessions_total: Arc<DashMap<String, Arc<AtomicU64>>>,
311
312 messages_per_domain: Arc<DashMap<String, Arc<AtomicU64>>>,
314
315 domain_stats_source: Arc<Mutex<Option<DomainStatsSource>>>,
317}
318
319impl std::fmt::Debug for MetricsCollector {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 f.debug_struct("MetricsCollector")
322 .field(
323 "smtp_connections_total",
324 &self.smtp_connections_total.load(Ordering::Relaxed),
325 )
326 .field(
327 "active_connections_protocols",
328 &self.active_connections.len(),
329 )
330 .field("tls_label_count", &self.tls_sessions_total.len())
331 .field("domain_label_count", &self.messages_per_domain.len())
332 .finish()
333 }
334}
335
336impl Default for MetricsCollector {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl MetricsCollector {
343 pub fn new() -> Self {
345 let latency_buckets = vec![
347 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
348 ];
349 let duration_buckets = vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0];
351
352 Self {
353 smtp_connections_total: Arc::new(AtomicU64::new(0)),
354 smtp_messages_received: Arc::new(AtomicU64::new(0)),
355 smtp_messages_sent: Arc::new(AtomicU64::new(0)),
356 smtp_errors: Arc::new(AtomicU64::new(0)),
357 smtp_auth_success_total: Arc::new(AtomicU64::new(0)),
358 smtp_auth_failure_total: Arc::new(AtomicU64::new(0)),
359 smtp_messages_rejected_total: Arc::new(AtomicU64::new(0)),
360 smtp_connections_rejected_blocked: Arc::new(AtomicU64::new(0)),
361 smtp_connections_rejected_overload: Arc::new(AtomicU64::new(0)),
362 imap_connections_total: Arc::new(AtomicU64::new(0)),
363 imap_commands_total: Arc::new(AtomicU64::new(0)),
364 imap_errors: Arc::new(AtomicU64::new(0)),
365 jmap_requests_total: Arc::new(AtomicU64::new(0)),
366 jmap_errors: Arc::new(AtomicU64::new(0)),
367 push_deliveries_total: Arc::new(AtomicU64::new(0)),
368 push_delivery_failures_total: Arc::new(AtomicU64::new(0)),
369 mail_processed_total: Arc::new(AtomicU64::new(0)),
370 mail_delivered_total: Arc::new(AtomicU64::new(0)),
371 mail_bounced_total: Arc::new(AtomicU64::new(0)),
372 mail_dropped_total: Arc::new(AtomicU64::new(0)),
373 queue_size: Arc::new(AtomicU64::new(0)),
374 queue_retries: Arc::new(AtomicU64::new(0)),
375 mailboxes_total: Arc::new(AtomicU64::new(0)),
376 messages_total: Arc::new(AtomicU64::new(0)),
377 storage_bytes: Arc::new(AtomicU64::new(0)),
378 message_processing_latency: Arc::new(Histogram::new(latency_buckets)),
379 smtp_session_duration: Arc::new(Histogram::new(duration_buckets)),
380 active_connections: Arc::new(DashMap::new()),
381 tls_sessions_total: Arc::new(DashMap::new()),
382 messages_per_domain: Arc::new(DashMap::new()),
383 domain_stats_source: Arc::new(Mutex::new(None)),
384 }
385 }
386
387 pub fn record_mail_completed(&self, _mail: &Mail) {
389 self.inc_mail_processed();
390 self.inc_mail_delivered();
391 }
392
393 pub fn inc_smtp_connections(&self) {
395 self.smtp_connections_total.fetch_add(1, Ordering::Relaxed);
396 }
397
398 pub fn inc_smtp_messages_received(&self) {
399 self.smtp_messages_received.fetch_add(1, Ordering::Relaxed);
400 }
401
402 pub fn inc_smtp_messages_sent(&self) {
403 self.smtp_messages_sent.fetch_add(1, Ordering::Relaxed);
404 }
405
406 pub fn inc_smtp_errors(&self) {
407 self.smtp_errors.fetch_add(1, Ordering::Relaxed);
408 }
409
410 pub fn inc_smtp_auth_success(&self) {
412 self.smtp_auth_success_total.fetch_add(1, Ordering::Relaxed);
413 }
414
415 pub fn inc_smtp_auth_failure(&self) {
417 self.smtp_auth_failure_total.fetch_add(1, Ordering::Relaxed);
418 }
419
420 pub fn inc_smtp_messages_rejected(&self) {
422 self.smtp_messages_rejected_total
423 .fetch_add(1, Ordering::Relaxed);
424 }
425
426 pub fn smtp_auth_success_count(&self) -> u64 {
428 self.smtp_auth_success_total.load(Ordering::Relaxed)
429 }
430
431 pub fn smtp_auth_failure_count(&self) -> u64 {
433 self.smtp_auth_failure_total.load(Ordering::Relaxed)
434 }
435
436 pub fn smtp_messages_rejected_count(&self) -> u64 {
438 self.smtp_messages_rejected_total.load(Ordering::Relaxed)
439 }
440
441 pub fn inc_smtp_connections_rejected_blocked(&self) {
443 self.smtp_connections_rejected_blocked
444 .fetch_add(1, Ordering::Relaxed);
445 }
446
447 pub fn smtp_connections_rejected_blocked_count(&self) -> u64 {
449 self.smtp_connections_rejected_blocked
450 .load(Ordering::Relaxed)
451 }
452
453 pub fn inc_smtp_connections_rejected_overload(&self) {
455 self.smtp_connections_rejected_overload
456 .fetch_add(1, Ordering::Relaxed);
457 }
458
459 pub fn smtp_connections_rejected_overload_count(&self) -> u64 {
461 self.smtp_connections_rejected_overload
462 .load(Ordering::Relaxed)
463 }
464
465 pub fn smtp_messages_accepted_count(&self) -> u64 {
467 self.smtp_messages_received.load(Ordering::Relaxed)
468 }
469
470 pub fn smtp_connections_count(&self) -> u64 {
472 self.smtp_connections_total.load(Ordering::Relaxed)
473 }
474
475 pub fn inc_imap_connections(&self) {
477 self.imap_connections_total.fetch_add(1, Ordering::Relaxed);
478 }
479
480 pub fn inc_imap_commands(&self) {
481 self.imap_commands_total.fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn inc_imap_errors(&self) {
485 self.imap_errors.fetch_add(1, Ordering::Relaxed);
486 }
487
488 pub fn inc_jmap_requests(&self) {
490 self.jmap_requests_total.fetch_add(1, Ordering::Relaxed);
491 }
492
493 pub fn inc_jmap_errors(&self) {
494 self.jmap_errors.fetch_add(1, Ordering::Relaxed);
495 }
496
497 pub fn inc_push_deliveries(&self) {
499 self.push_deliveries_total.fetch_add(1, Ordering::Relaxed);
500 }
501
502 pub fn inc_push_delivery_failures(&self) {
504 self.push_delivery_failures_total
505 .fetch_add(1, Ordering::Relaxed);
506 }
507
508 pub fn push_deliveries_count(&self) -> u64 {
510 self.push_deliveries_total.load(Ordering::Relaxed)
511 }
512
513 pub fn push_delivery_failures_count(&self) -> u64 {
515 self.push_delivery_failures_total.load(Ordering::Relaxed)
516 }
517
518 pub fn inc_mail_processed(&self) {
520 self.mail_processed_total.fetch_add(1, Ordering::Relaxed);
521 }
522
523 pub fn inc_mail_delivered(&self) {
524 self.mail_delivered_total.fetch_add(1, Ordering::Relaxed);
525 }
526
527 pub fn inc_mail_bounced(&self) {
528 self.mail_bounced_total.fetch_add(1, Ordering::Relaxed);
529 }
530
531 pub fn inc_mail_dropped(&self) {
532 self.mail_dropped_total.fetch_add(1, Ordering::Relaxed);
533 }
534
535 pub fn set_queue_size(&self, size: u64) {
537 self.queue_size.store(size, Ordering::Relaxed);
538 }
539
540 pub fn inc_queue_retries(&self) {
541 self.queue_retries.fetch_add(1, Ordering::Relaxed);
542 }
543
544 pub fn set_mailboxes_total(&self, count: u64) {
546 self.mailboxes_total.store(count, Ordering::Relaxed);
547 }
548
549 pub fn set_messages_total(&self, count: u64) {
550 self.messages_total.store(count, Ordering::Relaxed);
551 }
552
553 pub fn set_storage_bytes(&self, bytes: u64) {
554 self.storage_bytes.store(bytes, Ordering::Relaxed);
555 }
556
557 pub fn start_message_processing_timer(&self) -> Timer {
559 Timer::new(Arc::clone(&self.message_processing_latency))
560 }
561
562 pub fn start_smtp_session_timer(&self) -> Timer {
563 Timer::new(Arc::clone(&self.smtp_session_duration))
564 }
565
566 pub fn inc_active_connections(&self, protocol: &str) {
570 let entry = self
571 .active_connections
572 .entry(protocol.to_owned())
573 .or_insert_with(|| Arc::new(AtomicI64::new(0)));
574 entry.fetch_add(1, Ordering::Relaxed);
575 }
576
577 pub fn dec_active_connections(&self, protocol: &str) {
583 let entry = self
584 .active_connections
585 .entry(protocol.to_owned())
586 .or_insert_with(|| Arc::new(AtomicI64::new(0)));
587 entry.fetch_sub(1, Ordering::Relaxed);
588 }
589
590 pub fn active_connections(&self, protocol: &str) -> i64 {
592 self.active_connections
593 .get(protocol)
594 .map(|v| v.load(Ordering::Relaxed))
595 .unwrap_or(0)
596 }
597
598 pub fn connection_guard(&self, protocol: &str) -> ConnectionGuard {
609 self.inc_active_connections(protocol);
610 ConnectionGuard {
611 metrics: self.clone(),
612 protocol: protocol.to_owned(),
613 }
614 }
615
616 pub fn inc_tls_session(&self, tls_kind: &str) {
624 let entry = self
625 .tls_sessions_total
626 .entry(tls_kind.to_owned())
627 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
628 entry.fetch_add(1, Ordering::Relaxed);
629 }
630
631 pub fn tls_session_count(&self, tls_kind: &str) -> u64 {
633 self.tls_sessions_total
634 .get(tls_kind)
635 .map(|v| v.load(Ordering::Relaxed))
636 .unwrap_or(0)
637 }
638
639 pub fn set_messages_per_domain(&self, domain: &str, count: u64) {
646 let entry = self
647 .messages_per_domain
648 .entry(domain.to_owned())
649 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
650 entry.store(count, Ordering::Relaxed);
651 }
652
653 pub fn inc_messages_per_domain(&self, domain: &str) {
656 let entry = self
657 .messages_per_domain
658 .entry(domain.to_owned())
659 .or_insert_with(|| Arc::new(AtomicU64::new(0)));
660 entry.fetch_add(1, Ordering::Relaxed);
661 }
662
663 pub fn messages_per_domain(&self) -> HashMap<String, u64> {
665 self.messages_per_domain
666 .iter()
667 .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
668 .collect()
669 }
670
671 pub fn set_domain_stats_source(&self, source: DomainStatsSource) {
678 if let Ok(mut guard) = self.domain_stats_source.lock() {
679 *guard = Some(source);
680 }
681 }
682
683 pub fn spawn_domain_stats_refresher(&self, period: Duration) -> tokio::task::JoinHandle<()> {
690 let collector = self.clone();
691 tokio::spawn(async move {
692 let mut interval = tokio::time::interval(period);
693 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
694 loop {
695 interval.tick().await;
696 collector.refresh_domain_stats_now();
697 }
698 })
699 }
700
701 pub fn refresh_domain_stats_now(&self) {
703 let snapshot = match self.domain_stats_source.lock() {
704 Ok(guard) => match guard.as_ref() {
705 Some(src) => src(),
706 None => return,
707 },
708 Err(_) => return,
709 };
710 for (domain, count) in snapshot {
711 self.set_messages_per_domain(&domain, count);
712 }
713 }
714
715 pub fn export_prometheus(&self) -> String {
717 let mut output = String::new();
718
719 output.push_str("# HELP rusmes_smtp_connections_total Total SMTP connections\n");
721 output.push_str("# TYPE rusmes_smtp_connections_total counter\n");
722 output.push_str(&format!(
723 "rusmes_smtp_connections_total {}\n",
724 self.smtp_connections_total.load(Ordering::Relaxed)
725 ));
726
727 output
728 .push_str("# HELP rusmes_smtp_messages_received_total Total SMTP messages received\n");
729 output.push_str("# TYPE rusmes_smtp_messages_received_total counter\n");
730 output.push_str(&format!(
731 "rusmes_smtp_messages_received_total {}\n",
732 self.smtp_messages_received.load(Ordering::Relaxed)
733 ));
734
735 output.push_str("# HELP rusmes_smtp_messages_sent_total Total SMTP messages sent\n");
736 output.push_str("# TYPE rusmes_smtp_messages_sent_total counter\n");
737 output.push_str(&format!(
738 "rusmes_smtp_messages_sent_total {}\n",
739 self.smtp_messages_sent.load(Ordering::Relaxed)
740 ));
741
742 output.push_str("# HELP rusmes_smtp_errors_total Total SMTP errors\n");
743 output.push_str("# TYPE rusmes_smtp_errors_total counter\n");
744 output.push_str(&format!(
745 "rusmes_smtp_errors_total {}\n",
746 self.smtp_errors.load(Ordering::Relaxed)
747 ));
748
749 output.push_str(
750 "# HELP rusmes_smtp_auth_success_total Total successful SMTP AUTH exchanges\n",
751 );
752 output.push_str("# TYPE rusmes_smtp_auth_success_total counter\n");
753 output.push_str(&format!(
754 "rusmes_smtp_auth_success_total {}\n",
755 self.smtp_auth_success_total.load(Ordering::Relaxed)
756 ));
757
758 output.push_str("# HELP rusmes_smtp_auth_failure_total Total failed SMTP AUTH exchanges\n");
759 output.push_str("# TYPE rusmes_smtp_auth_failure_total counter\n");
760 output.push_str(&format!(
761 "rusmes_smtp_auth_failure_total {}\n",
762 self.smtp_auth_failure_total.load(Ordering::Relaxed)
763 ));
764
765 output.push_str("# HELP rusmes_smtp_messages_rejected_total Total SMTP messages rejected due to size limit exceeded during DATA\n");
766 output.push_str("# TYPE rusmes_smtp_messages_rejected_total counter\n");
767 output.push_str(&format!(
768 "rusmes_smtp_messages_rejected_total {}\n",
769 self.smtp_messages_rejected_total.load(Ordering::Relaxed)
770 ));
771
772 output.push_str("# HELP rusmes_smtp_connections_rejected_blocked_total Total SMTP connections rejected due to blocked IP\n");
773 output.push_str("# TYPE rusmes_smtp_connections_rejected_blocked_total counter\n");
774 output.push_str(&format!(
775 "rusmes_smtp_connections_rejected_blocked_total {}\n",
776 self.smtp_connections_rejected_blocked
777 .load(Ordering::Relaxed)
778 ));
779
780 output.push_str("# HELP rusmes_smtp_connections_rejected_overload_total Total SMTP connections rejected due to per-IP connection cap exceeded\n");
781 output.push_str("# TYPE rusmes_smtp_connections_rejected_overload_total counter\n");
782 output.push_str(&format!(
783 "rusmes_smtp_connections_rejected_overload_total {}\n",
784 self.smtp_connections_rejected_overload
785 .load(Ordering::Relaxed)
786 ));
787
788 output.push_str("# HELP rusmes_imap_connections_total Total IMAP connections\n");
790 output.push_str("# TYPE rusmes_imap_connections_total counter\n");
791 output.push_str(&format!(
792 "rusmes_imap_connections_total {}\n",
793 self.imap_connections_total.load(Ordering::Relaxed)
794 ));
795
796 output.push_str("# HELP rusmes_imap_commands_total Total IMAP commands\n");
797 output.push_str("# TYPE rusmes_imap_commands_total counter\n");
798 output.push_str(&format!(
799 "rusmes_imap_commands_total {}\n",
800 self.imap_commands_total.load(Ordering::Relaxed)
801 ));
802
803 output.push_str("# HELP rusmes_imap_errors_total Total IMAP errors\n");
804 output.push_str("# TYPE rusmes_imap_errors_total counter\n");
805 output.push_str(&format!(
806 "rusmes_imap_errors_total {}\n",
807 self.imap_errors.load(Ordering::Relaxed)
808 ));
809
810 output.push_str("# HELP rusmes_jmap_requests_total Total JMAP requests\n");
812 output.push_str("# TYPE rusmes_jmap_requests_total counter\n");
813 output.push_str(&format!(
814 "rusmes_jmap_requests_total {}\n",
815 self.jmap_requests_total.load(Ordering::Relaxed)
816 ));
817
818 output.push_str("# HELP rusmes_jmap_errors_total Total JMAP errors\n");
819 output.push_str("# TYPE rusmes_jmap_errors_total counter\n");
820 output.push_str(&format!(
821 "rusmes_jmap_errors_total {}\n",
822 self.jmap_errors.load(Ordering::Relaxed)
823 ));
824
825 output
827 .push_str("# HELP rusmes_push_deliveries_total Total successful WebPush deliveries\n");
828 output.push_str("# TYPE rusmes_push_deliveries_total counter\n");
829 output.push_str(&format!(
830 "rusmes_push_deliveries_total {}\n",
831 self.push_deliveries_total.load(Ordering::Relaxed)
832 ));
833
834 output.push_str(
835 "# HELP rusmes_push_delivery_failures_total Total WebPush final delivery failures\n",
836 );
837 output.push_str("# TYPE rusmes_push_delivery_failures_total counter\n");
838 output.push_str(&format!(
839 "rusmes_push_delivery_failures_total {}\n",
840 self.push_delivery_failures_total.load(Ordering::Relaxed)
841 ));
842
843 output.push_str("# HELP rusmes_mail_processed_total Total mail processed\n");
845 output.push_str("# TYPE rusmes_mail_processed_total counter\n");
846 output.push_str(&format!(
847 "rusmes_mail_processed_total {}\n",
848 self.mail_processed_total.load(Ordering::Relaxed)
849 ));
850
851 output.push_str("# HELP rusmes_mail_delivered_total Total mail delivered\n");
852 output.push_str("# TYPE rusmes_mail_delivered_total counter\n");
853 output.push_str(&format!(
854 "rusmes_mail_delivered_total {}\n",
855 self.mail_delivered_total.load(Ordering::Relaxed)
856 ));
857
858 output.push_str("# HELP rusmes_mail_bounced_total Total mail bounced\n");
859 output.push_str("# TYPE rusmes_mail_bounced_total counter\n");
860 output.push_str(&format!(
861 "rusmes_mail_bounced_total {}\n",
862 self.mail_bounced_total.load(Ordering::Relaxed)
863 ));
864
865 output.push_str("# HELP rusmes_mail_dropped_total Total mail dropped\n");
866 output.push_str("# TYPE rusmes_mail_dropped_total counter\n");
867 output.push_str(&format!(
868 "rusmes_mail_dropped_total {}\n",
869 self.mail_dropped_total.load(Ordering::Relaxed)
870 ));
871
872 output.push_str("# HELP rusmes_queue_size Current queue size\n");
874 output.push_str("# TYPE rusmes_queue_size gauge\n");
875 output.push_str(&format!(
876 "rusmes_queue_size {}\n",
877 self.queue_size.load(Ordering::Relaxed)
878 ));
879
880 output.push_str("# HELP rusmes_queue_retries_total Total queue retries\n");
881 output.push_str("# TYPE rusmes_queue_retries_total counter\n");
882 output.push_str(&format!(
883 "rusmes_queue_retries_total {}\n",
884 self.queue_retries.load(Ordering::Relaxed)
885 ));
886
887 output.push_str("# HELP rusmes_mailboxes_total Total mailboxes\n");
889 output.push_str("# TYPE rusmes_mailboxes_total gauge\n");
890 output.push_str(&format!(
891 "rusmes_mailboxes_total {}\n",
892 self.mailboxes_total.load(Ordering::Relaxed)
893 ));
894
895 output.push_str("# HELP rusmes_messages_total Total messages\n");
896 output.push_str("# TYPE rusmes_messages_total gauge\n");
897 output.push_str(&format!(
898 "rusmes_messages_total {}\n",
899 self.messages_total.load(Ordering::Relaxed)
900 ));
901
902 output.push_str("# HELP rusmes_storage_bytes Total storage bytes\n");
903 output.push_str("# TYPE rusmes_storage_bytes gauge\n");
904 output.push_str(&format!(
905 "rusmes_storage_bytes {}\n",
906 self.storage_bytes.load(Ordering::Relaxed)
907 ));
908
909 output.push_str(&self.message_processing_latency.export(
911 "rusmes_message_processing_latency_seconds",
912 "Message processing latency in seconds",
913 ));
914
915 output.push_str(&self.smtp_session_duration.export(
916 "rusmes_smtp_session_duration_seconds",
917 "SMTP session duration in seconds",
918 ));
919
920 output.push_str(
922 "# HELP rusmes_active_connections Currently open client connections per protocol\n",
923 );
924 output.push_str("# TYPE rusmes_active_connections gauge\n");
925 let mut active: Vec<(String, i64)> = self
927 .active_connections
928 .iter()
929 .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
930 .collect();
931 active.sort_by(|a, b| a.0.cmp(&b.0));
932 for (protocol, value) in active {
933 output.push_str(&format!(
934 "rusmes_active_connections{{protocol=\"{}\"}} {}\n",
935 escape_label_value(&protocol),
936 value
937 ));
938 }
939
940 output.push_str(
942 "# HELP rusmes_tls_sessions_total Total client sessions seen, partitioned by TLS state\n",
943 );
944 output.push_str("# TYPE rusmes_tls_sessions_total counter\n");
945 let mut tls: Vec<(String, u64)> = self
946 .tls_sessions_total
947 .iter()
948 .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
949 .collect();
950 tls.sort_by(|a, b| a.0.cmp(&b.0));
951 for (label, value) in tls {
952 output.push_str(&format!(
953 "rusmes_tls_sessions_total{{tls=\"{}\"}} {}\n",
954 escape_label_value(&label),
955 value
956 ));
957 }
958
959 self.refresh_domain_stats_now();
963 output.push_str(
964 "# HELP rusmes_messages_per_domain_total Total messages enqueued per recipient domain\n",
965 );
966 output.push_str("# TYPE rusmes_messages_per_domain_total counter\n");
967 let mut domains: Vec<(String, u64)> = self
968 .messages_per_domain
969 .iter()
970 .map(|kv| (kv.key().clone(), kv.value().load(Ordering::Relaxed)))
971 .collect();
972 domains.sort_by(|a, b| a.0.cmp(&b.0));
973 for (domain, value) in domains {
974 output.push_str(&format!(
975 "rusmes_messages_per_domain_total{{domain=\"{}\"}} {}\n",
976 escape_label_value(&domain),
977 value
978 ));
979 }
980
981 output
982 }
983
984 pub async fn start_http_server(self, config: MetricsConfig) -> anyhow::Result<()> {
986 if !config.enabled {
987 eprintln!("Metrics HTTP server is disabled");
988 return Ok(());
989 }
990
991 config.validate_bind_address()?;
992 config.validate_path()?;
993
994 let bind_address = config.bind_address.clone();
995 let metrics_path = config.path.clone();
996 let basic_auth = config.basic_auth.clone();
997 let app = self.build_router(&metrics_path, basic_auth);
998
999 eprintln!(
1000 "Starting metrics HTTP server on {}{}",
1001 bind_address, metrics_path
1002 );
1003 eprintln!("Health check endpoints: /health, /ready, /live");
1004
1005 let listener = TcpListener::bind(&bind_address).await?;
1006 axum::serve(listener, app).await?;
1007
1008 Ok(())
1009 }
1010
1011 pub fn build_router(
1016 self,
1017 metrics_path: &str,
1018 basic_auth: Option<MetricsBasicAuthConfig>,
1019 ) -> Router {
1020 let metrics = Arc::new(Mutex::new(self));
1021
1022 let metrics_handler = {
1023 let metrics = Arc::clone(&metrics);
1024 move || {
1025 let metrics = Arc::clone(&metrics);
1026 async move {
1027 let collector = match metrics.lock() {
1028 Ok(guard) => guard,
1029 Err(e) => {
1030 ::tracing::error!("Metrics mutex poisoned: {e}");
1031 return StatusCode::INTERNAL_SERVER_ERROR.into_response();
1032 }
1033 };
1034 let output = collector.export_prometheus();
1035 (
1036 [(header::CONTENT_TYPE, "text/plain; version=0.0.4")],
1037 output,
1038 )
1039 .into_response()
1040 }
1041 }
1042 };
1043
1044 let mut metrics_router = Router::new().route(metrics_path, get(metrics_handler));
1045 if let Some(auth) = basic_auth {
1046 let state = Arc::new(BasicAuthState { config: auth });
1047 metrics_router =
1048 metrics_router.layer(middleware::from_fn_with_state(state, basic_auth_middleware));
1049 }
1050
1051 let health_router = create_health_router();
1052 Router::new().merge(metrics_router).merge(health_router)
1053 }
1054}
1055
1056#[derive(Clone)]
1058struct BasicAuthState {
1059 config: MetricsBasicAuthConfig,
1060}
1061
1062async fn basic_auth_middleware(
1067 State(state): State<Arc<BasicAuthState>>,
1068 request: Request,
1069 next: Next,
1070) -> Response {
1071 let header_value = request.headers().get(header::AUTHORIZATION);
1072 if !verify_basic_auth(header_value, &state.config) {
1073 let mut response = StatusCode::UNAUTHORIZED.into_response();
1074 let realm = HeaderValue::from_static("Basic realm=\"rusmes-metrics\", charset=\"UTF-8\"");
1075 response
1076 .headers_mut()
1077 .insert(header::WWW_AUTHENTICATE, realm);
1078 *response.body_mut() = Body::from("401 Unauthorized: metrics endpoint requires basic auth");
1080 return response;
1081 }
1082 next.run(request).await
1083}
1084
1085fn verify_basic_auth(header_value: Option<&HeaderValue>, config: &MetricsBasicAuthConfig) -> bool {
1092 let value = match header_value.and_then(|v| v.to_str().ok()) {
1093 Some(v) => v,
1094 None => return false,
1095 };
1096 let encoded = match value.strip_prefix("Basic ") {
1097 Some(s) => s.trim(),
1098 None => return false,
1099 };
1100 let decoded = match BASE64.decode(encoded) {
1101 Ok(bytes) => bytes,
1102 Err(_) => return false,
1103 };
1104 let credentials = match std::str::from_utf8(&decoded) {
1105 Ok(s) => s,
1106 Err(_) => return false,
1107 };
1108 let (user, password) = match credentials.split_once(':') {
1109 Some(parts) => parts,
1110 None => return false,
1111 };
1112 if !bytes_eq_constant_time(user.as_bytes(), config.username.as_bytes()) {
1113 return false;
1114 }
1115 match bcrypt::verify(password, &config.password_hash) {
1116 Ok(ok) => ok,
1117 Err(e) => {
1118 ::tracing::warn!(
1119 "bcrypt verify failed for metrics basic auth (likely malformed hash in config): {e}"
1120 );
1121 false
1122 }
1123 }
1124}
1125
1126fn bytes_eq_constant_time(a: &[u8], b: &[u8]) -> bool {
1128 if a.len() != b.len() {
1129 return false;
1130 }
1131 let mut diff: u8 = 0;
1132 for (x, y) in a.iter().zip(b.iter()) {
1133 diff |= x ^ y;
1134 }
1135 diff == 0
1136}
1137
1138fn escape_label_value(value: &str) -> String {
1144 let mut out = String::with_capacity(value.len());
1145 for ch in value.chars() {
1146 match ch {
1147 '\\' => out.push_str("\\\\"),
1148 '"' => out.push_str("\\\""),
1149 '\n' => out.push_str("\\n"),
1150 other => out.push(other),
1151 }
1152 }
1153 out
1154}
1155
1156#[derive(Debug, Serialize, Clone)]
1158pub struct HealthResponse {
1159 pub status: String,
1160 pub checks: HealthChecks,
1161}
1162
1163#[derive(Debug, Serialize, Clone)]
1165pub struct HealthChecks {
1166 pub storage: String,
1167 pub queue: String,
1168}
1169
1170#[derive(Debug, Serialize, Clone)]
1172pub struct ReadyResponse {
1173 pub ready: bool,
1174}
1175
1176#[derive(Debug, Serialize, Clone)]
1178pub struct LiveResponse {
1179 pub alive: bool,
1180}
1181
1182async fn health_check() -> (StatusCode, Json<HealthResponse>) {
1184 let storage_status = check_storage().await;
1185 let queue_status = check_queue().await;
1186
1187 let all_healthy = storage_status == "healthy" && queue_status == "healthy";
1188 let status_code = if all_healthy {
1189 StatusCode::OK
1190 } else {
1191 StatusCode::SERVICE_UNAVAILABLE
1192 };
1193
1194 let response = HealthResponse {
1195 status: if all_healthy {
1196 "healthy".to_string()
1197 } else {
1198 "unhealthy".to_string()
1199 },
1200 checks: HealthChecks {
1201 storage: storage_status,
1202 queue: queue_status,
1203 },
1204 };
1205
1206 (status_code, Json(response))
1207}
1208
1209async fn readiness_check() -> (StatusCode, Json<ReadyResponse>) {
1211 let ready = true;
1212
1213 let status_code = if ready {
1214 StatusCode::OK
1215 } else {
1216 StatusCode::SERVICE_UNAVAILABLE
1217 };
1218
1219 (status_code, Json(ReadyResponse { ready }))
1220}
1221
1222async fn liveness_check() -> (StatusCode, Json<LiveResponse>) {
1224 (StatusCode::OK, Json(LiveResponse { alive: true }))
1225}
1226
1227async fn check_storage() -> String {
1229 "healthy".to_string()
1230}
1231
1232async fn check_queue() -> String {
1234 "healthy".to_string()
1235}
1236
1237pub fn create_health_router() -> Router {
1239 Router::new()
1240 .route("/health", get(health_check))
1241 .route("/ready", get(readiness_check))
1242 .route("/live", get(liveness_check))
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247 use super::*;
1248 use axum::body::Body;
1249 use axum::http::Request as HttpRequest;
1250 use http_body_util::BodyExt;
1251 use tower::ServiceExt;
1252
1253 #[test]
1254 fn test_metrics_collector() {
1255 let metrics = MetricsCollector::new();
1256
1257 metrics.inc_smtp_connections();
1258 metrics.inc_smtp_messages_received();
1259 metrics.inc_mail_processed();
1260 metrics.inc_mail_delivered();
1261
1262 assert_eq!(metrics.smtp_connections_total.load(Ordering::Relaxed), 1);
1263 assert_eq!(metrics.smtp_messages_received.load(Ordering::Relaxed), 1);
1264 assert_eq!(metrics.mail_processed_total.load(Ordering::Relaxed), 1);
1265 assert_eq!(metrics.mail_delivered_total.load(Ordering::Relaxed), 1);
1266 }
1267
1268 #[test]
1269 fn test_prometheus_export() {
1270 let metrics = MetricsCollector::new();
1271 metrics.inc_smtp_connections();
1272 metrics.set_queue_size(42);
1273
1274 let output = metrics.export_prometheus();
1275
1276 assert!(output.contains("rusmes_smtp_connections_total 1"));
1277 assert!(output.contains("rusmes_queue_size 42"));
1278 assert!(output.contains("# HELP"));
1279 assert!(output.contains("# TYPE"));
1280 }
1281
1282 #[test]
1287 fn test_active_connections_guard_roundtrip() {
1288 let metrics = MetricsCollector::new();
1289 assert_eq!(metrics.active_connections("smtp"), 0);
1290
1291 {
1292 let _g = metrics.connection_guard("smtp");
1293 assert_eq!(metrics.active_connections("smtp"), 1);
1294 {
1295 let _g2 = metrics.connection_guard("smtp");
1296 assert_eq!(metrics.active_connections("smtp"), 2);
1297 }
1298 assert_eq!(metrics.active_connections("smtp"), 1);
1299 }
1300 assert_eq!(metrics.active_connections("smtp"), 0);
1301
1302 let _g = metrics.connection_guard("imap");
1304 assert_eq!(metrics.active_connections("imap"), 1);
1305 assert_eq!(metrics.active_connections("smtp"), 0);
1306 }
1307
1308 #[test]
1311 fn test_tls_session_counter_labels() {
1312 let metrics = MetricsCollector::new();
1313 metrics.inc_tls_session(tls_label::NO);
1314 metrics.inc_tls_session(tls_label::NO);
1315 metrics.inc_tls_session(tls_label::STARTTLS);
1316 metrics.inc_tls_session(tls_label::YES);
1317
1318 assert_eq!(metrics.tls_session_count(tls_label::NO), 2);
1319 assert_eq!(metrics.tls_session_count(tls_label::STARTTLS), 1);
1320 assert_eq!(metrics.tls_session_count(tls_label::YES), 1);
1321
1322 let exp = metrics.export_prometheus();
1323 assert!(exp.contains("rusmes_tls_sessions_total{tls=\"no\"} 2"));
1324 assert!(exp.contains("rusmes_tls_sessions_total{tls=\"starttls\"} 1"));
1325 assert!(exp.contains("rusmes_tls_sessions_total{tls=\"yes\"} 1"));
1326 assert!(exp.contains("# TYPE rusmes_tls_sessions_total counter"));
1327 }
1328
1329 #[test]
1336 fn test_messages_per_domain_from_callback_source() {
1337 let metrics = MetricsCollector::new();
1338 metrics.set_domain_stats_source(Arc::new(|| {
1339 let mut m = HashMap::new();
1340 m.insert("example.com".to_string(), 5u64);
1341 m.insert("example.org".to_string(), 3u64);
1342 m
1343 }));
1344
1345 let exp = metrics.export_prometheus();
1346 assert!(
1347 exp.contains("rusmes_messages_per_domain_total{domain=\"example.com\"} 5"),
1348 "exposition was:\n{exp}"
1349 );
1350 assert!(exp.contains("rusmes_messages_per_domain_total{domain=\"example.org\"} 3"));
1351 assert!(exp.contains("# TYPE rusmes_messages_per_domain_total counter"));
1352 }
1353
1354 #[test]
1357 fn test_escape_label_value_quotes_and_backslash() {
1358 assert_eq!(escape_label_value("plain"), "plain");
1359 assert_eq!(escape_label_value("a\"b"), "a\\\"b");
1360 assert_eq!(escape_label_value("a\\b"), "a\\\\b");
1361 assert_eq!(escape_label_value("a\nb"), "a\\nb");
1362 }
1363
1364 #[test]
1366 fn test_constant_time_eq() {
1367 assert!(bytes_eq_constant_time(b"abc", b"abc"));
1368 assert!(!bytes_eq_constant_time(b"abc", b"abd"));
1369 assert!(!bytes_eq_constant_time(b"abc", b"abcd"));
1370 assert!(bytes_eq_constant_time(b"", b""));
1371 }
1372
1373 fn router_with_basic_auth(creds: Option<(&str, &str)>) -> Router {
1375 let metrics = MetricsCollector::new();
1376 let auth = creds.map(|(u, p)| MetricsBasicAuthConfig {
1377 username: u.to_string(),
1378 password_hash: bcrypt::hash(p, 4).expect("bcrypt hash for test"),
1380 });
1381 metrics.build_router("/metrics", auth)
1382 }
1383
1384 #[tokio::test]
1388 async fn test_metrics_basic_auth_accepts_correct_credentials() {
1389 let app = router_with_basic_auth(Some(("scrape", "s3cret")));
1390
1391 let resp = app
1393 .clone()
1394 .oneshot(
1395 HttpRequest::builder()
1396 .uri("/metrics")
1397 .body(Body::empty())
1398 .expect("request build"),
1399 )
1400 .await
1401 .expect("router call");
1402 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1403 let www_auth = resp
1404 .headers()
1405 .get(header::WWW_AUTHENTICATE)
1406 .expect("WWW-Authenticate header on 401")
1407 .to_str()
1408 .expect("ascii header");
1409 assert!(www_auth.starts_with("Basic realm="), "got: {www_auth}");
1410
1411 let creds = BASE64.encode(b"scrape:s3cret");
1413 let resp = app
1414 .clone()
1415 .oneshot(
1416 HttpRequest::builder()
1417 .uri("/metrics")
1418 .header(header::AUTHORIZATION, format!("Basic {creds}"))
1419 .body(Body::empty())
1420 .expect("request build"),
1421 )
1422 .await
1423 .expect("router call");
1424 assert_eq!(resp.status(), StatusCode::OK);
1425 let body_bytes = resp
1426 .into_body()
1427 .collect()
1428 .await
1429 .expect("collect")
1430 .to_bytes();
1431 let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
1432 assert!(body_text.contains("# HELP"), "body was:\n{body_text}");
1433 }
1434
1435 #[tokio::test]
1437 async fn test_metrics_basic_auth_rejects_wrong_credentials() {
1438 let app = router_with_basic_auth(Some(("scrape", "s3cret")));
1439
1440 let bad_user = BASE64.encode(b"wrong:s3cret");
1441 let resp = app
1442 .clone()
1443 .oneshot(
1444 HttpRequest::builder()
1445 .uri("/metrics")
1446 .header(header::AUTHORIZATION, format!("Basic {bad_user}"))
1447 .body(Body::empty())
1448 .expect("request build"),
1449 )
1450 .await
1451 .expect("router call");
1452 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1453
1454 let bad_pass = BASE64.encode(b"scrape:wrong");
1455 let resp = app
1456 .oneshot(
1457 HttpRequest::builder()
1458 .uri("/metrics")
1459 .header(header::AUTHORIZATION, format!("Basic {bad_pass}"))
1460 .body(Body::empty())
1461 .expect("request build"),
1462 )
1463 .await
1464 .expect("router call");
1465 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1466 }
1467
1468 #[tokio::test]
1471 async fn test_metrics_no_basic_auth_serves_anonymously() {
1472 let app = router_with_basic_auth(None);
1473 let resp = app
1474 .oneshot(
1475 HttpRequest::builder()
1476 .uri("/metrics")
1477 .body(Body::empty())
1478 .expect("request build"),
1479 )
1480 .await
1481 .expect("router call");
1482 assert_eq!(resp.status(), StatusCode::OK);
1483 }
1484
1485 #[test]
1487 fn test_connection_guard_increments_and_decrements() {
1488 let metrics = MetricsCollector::new();
1489 assert_eq!(metrics.active_connections("pop3"), 0);
1490 let guard = metrics.connection_guard("pop3");
1491 assert_eq!(metrics.active_connections("pop3"), 1);
1492 drop(guard);
1493 assert_eq!(metrics.active_connections("pop3"), 0);
1494 }
1495
1496 #[test]
1498 fn test_connection_metrics_total() {
1499 let metrics = MetricsCollector::new();
1500 let _g1 = metrics.connection_guard("smtp");
1501 let _g2 = metrics.connection_guard("imap");
1502 let _g3 = metrics.connection_guard("pop3");
1503 assert_eq!(metrics.active_connections("smtp"), 1);
1504 assert_eq!(metrics.active_connections("imap"), 1);
1505 assert_eq!(metrics.active_connections("pop3"), 1);
1506 }
1507
1508 #[test]
1510 fn test_prometheus_format() {
1511 let metrics = MetricsCollector::new();
1512 let _g = metrics.connection_guard("smtp");
1513 let output = metrics.export_prometheus();
1514 assert!(
1515 output.contains("rusmes_active_connections{protocol=\"smtp\"} 1"),
1516 "prometheus output was:\n{output}"
1517 );
1518 assert!(output.contains("# TYPE rusmes_active_connections gauge"));
1519 }
1520
1521 #[tokio::test]
1523 async fn test_metrics_server_responds() {
1524 let metrics = MetricsCollector::new();
1525 let _g = metrics.connection_guard("smtp");
1526 let app = metrics.build_router("/metrics", None);
1527
1528 let resp = app
1529 .oneshot(
1530 HttpRequest::builder()
1531 .uri("/metrics")
1532 .body(Body::empty())
1533 .expect("request build"),
1534 )
1535 .await
1536 .expect("router call");
1537 assert_eq!(resp.status(), StatusCode::OK);
1538 let body_bytes = resp
1539 .into_body()
1540 .collect()
1541 .await
1542 .expect("collect")
1543 .to_bytes();
1544 let body_text = std::str::from_utf8(&body_bytes).expect("utf-8 body");
1545 assert!(
1546 body_text.contains("rusmes_active_connections"),
1547 "body was:\n{body_text}"
1548 );
1549 }
1550
1551 #[test]
1553 fn test_global_metrics_singleton() {
1554 let a = global_metrics() as *const _;
1555 let b = global_metrics() as *const _;
1556 assert_eq!(a, b, "global_metrics() must return the same instance");
1557 }
1558}