1pub mod tracing;
82
83use axum::{http::StatusCode, response::IntoResponse, routing::get, Json, Router};
84use rusmes_config::MetricsConfig;
85use rusmes_proto::Mail;
86use serde::Serialize;
87use std::sync::atomic::{AtomicU64, Ordering};
88use std::sync::{Arc, Mutex};
89use std::time::Instant;
90use tokio::net::TcpListener;
91
92#[derive(Debug, Clone)]
94struct Histogram {
95 buckets: Vec<f64>,
96 counts: Vec<Arc<AtomicU64>>,
97 sum: Arc<AtomicU64>,
98 count: Arc<AtomicU64>,
99}
100
101impl Histogram {
102 fn new(buckets: Vec<f64>) -> Self {
103 let counts = buckets
104 .iter()
105 .map(|_| Arc::new(AtomicU64::new(0)))
106 .collect();
107 Self {
108 buckets,
109 counts,
110 sum: Arc::new(AtomicU64::new(0)),
111 count: Arc::new(AtomicU64::new(0)),
112 }
113 }
114
115 fn observe(&self, value: f64) {
116 let millis = (value * 1000.0) as u64;
117 self.sum.fetch_add(millis, Ordering::Relaxed);
118 self.count.fetch_add(1, Ordering::Relaxed);
119
120 for (i, &bucket) in self.buckets.iter().enumerate() {
121 if value <= bucket {
122 self.counts[i].fetch_add(1, Ordering::Relaxed);
123 }
124 }
125 }
126
127 fn export(&self, name: &str, help: &str) -> String {
128 let mut output = String::new();
129 output.push_str(&format!("# HELP {} {}\n", name, help));
130 output.push_str(&format!("# TYPE {} histogram\n", name));
131
132 for (i, &bucket) in self.buckets.iter().enumerate() {
133 let count = self.counts[i].load(Ordering::Relaxed);
134 output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
135 }
136
137 output.push_str(&format!(
138 "{}_bucket{{le=\"+Inf\"}} {}\n",
139 name,
140 self.count.load(Ordering::Relaxed)
141 ));
142 output.push_str(&format!(
143 "{}_sum {}\n",
144 name,
145 self.sum.load(Ordering::Relaxed) as f64 / 1000.0
146 ));
147 output.push_str(&format!(
148 "{}_count {}\n",
149 name,
150 self.count.load(Ordering::Relaxed)
151 ));
152
153 output
154 }
155}
156
157pub struct Timer {
159 start: Instant,
160 histogram: Arc<Histogram>,
161}
162
163impl Timer {
164 fn new(histogram: Arc<Histogram>) -> Self {
165 Self {
166 start: Instant::now(),
167 histogram,
168 }
169 }
170
171 pub fn observe(self) {
172 let duration = self.start.elapsed().as_secs_f64();
173 self.histogram.observe(duration);
174 }
175}
176
177#[derive(Debug, Clone)]
179pub struct MetricsCollector {
180 smtp_connections_total: Arc<AtomicU64>,
182 smtp_messages_received: Arc<AtomicU64>,
183 smtp_messages_sent: Arc<AtomicU64>,
184 smtp_errors: Arc<AtomicU64>,
185
186 imap_connections_total: Arc<AtomicU64>,
188 imap_commands_total: Arc<AtomicU64>,
189 imap_errors: Arc<AtomicU64>,
190
191 jmap_requests_total: Arc<AtomicU64>,
193 jmap_errors: Arc<AtomicU64>,
194
195 mail_processed_total: Arc<AtomicU64>,
197 mail_delivered_total: Arc<AtomicU64>,
198 mail_bounced_total: Arc<AtomicU64>,
199 mail_dropped_total: Arc<AtomicU64>,
200
201 queue_size: Arc<AtomicU64>,
203 queue_retries: Arc<AtomicU64>,
204
205 mailboxes_total: Arc<AtomicU64>,
207 messages_total: Arc<AtomicU64>,
208 storage_bytes: Arc<AtomicU64>,
209
210 message_processing_latency: Arc<Histogram>,
212 smtp_session_duration: Arc<Histogram>,
213}
214
215impl Default for MetricsCollector {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221impl MetricsCollector {
222 pub fn new() -> Self {
224 let latency_buckets = vec![
226 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
227 ];
228 let duration_buckets = vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0];
230
231 Self {
232 smtp_connections_total: Arc::new(AtomicU64::new(0)),
233 smtp_messages_received: Arc::new(AtomicU64::new(0)),
234 smtp_messages_sent: Arc::new(AtomicU64::new(0)),
235 smtp_errors: Arc::new(AtomicU64::new(0)),
236 imap_connections_total: Arc::new(AtomicU64::new(0)),
237 imap_commands_total: Arc::new(AtomicU64::new(0)),
238 imap_errors: Arc::new(AtomicU64::new(0)),
239 jmap_requests_total: Arc::new(AtomicU64::new(0)),
240 jmap_errors: Arc::new(AtomicU64::new(0)),
241 mail_processed_total: Arc::new(AtomicU64::new(0)),
242 mail_delivered_total: Arc::new(AtomicU64::new(0)),
243 mail_bounced_total: Arc::new(AtomicU64::new(0)),
244 mail_dropped_total: Arc::new(AtomicU64::new(0)),
245 queue_size: Arc::new(AtomicU64::new(0)),
246 queue_retries: Arc::new(AtomicU64::new(0)),
247 mailboxes_total: Arc::new(AtomicU64::new(0)),
248 messages_total: Arc::new(AtomicU64::new(0)),
249 storage_bytes: Arc::new(AtomicU64::new(0)),
250 message_processing_latency: Arc::new(Histogram::new(latency_buckets)),
251 smtp_session_duration: Arc::new(Histogram::new(duration_buckets)),
252 }
253 }
254
255 pub fn record_mail_completed(&self, _mail: &Mail) {
257 self.inc_mail_processed();
258 self.inc_mail_delivered();
259 }
260
261 pub fn inc_smtp_connections(&self) {
263 self.smtp_connections_total.fetch_add(1, Ordering::Relaxed);
264 }
265
266 pub fn inc_smtp_messages_received(&self) {
267 self.smtp_messages_received.fetch_add(1, Ordering::Relaxed);
268 }
269
270 pub fn inc_smtp_messages_sent(&self) {
271 self.smtp_messages_sent.fetch_add(1, Ordering::Relaxed);
272 }
273
274 pub fn inc_smtp_errors(&self) {
275 self.smtp_errors.fetch_add(1, Ordering::Relaxed);
276 }
277
278 pub fn inc_imap_connections(&self) {
280 self.imap_connections_total.fetch_add(1, Ordering::Relaxed);
281 }
282
283 pub fn inc_imap_commands(&self) {
284 self.imap_commands_total.fetch_add(1, Ordering::Relaxed);
285 }
286
287 pub fn inc_imap_errors(&self) {
288 self.imap_errors.fetch_add(1, Ordering::Relaxed);
289 }
290
291 pub fn inc_jmap_requests(&self) {
293 self.jmap_requests_total.fetch_add(1, Ordering::Relaxed);
294 }
295
296 pub fn inc_jmap_errors(&self) {
297 self.jmap_errors.fetch_add(1, Ordering::Relaxed);
298 }
299
300 pub fn inc_mail_processed(&self) {
302 self.mail_processed_total.fetch_add(1, Ordering::Relaxed);
303 }
304
305 pub fn inc_mail_delivered(&self) {
306 self.mail_delivered_total.fetch_add(1, Ordering::Relaxed);
307 }
308
309 pub fn inc_mail_bounced(&self) {
310 self.mail_bounced_total.fetch_add(1, Ordering::Relaxed);
311 }
312
313 pub fn inc_mail_dropped(&self) {
314 self.mail_dropped_total.fetch_add(1, Ordering::Relaxed);
315 }
316
317 pub fn set_queue_size(&self, size: u64) {
319 self.queue_size.store(size, Ordering::Relaxed);
320 }
321
322 pub fn inc_queue_retries(&self) {
323 self.queue_retries.fetch_add(1, Ordering::Relaxed);
324 }
325
326 pub fn set_mailboxes_total(&self, count: u64) {
328 self.mailboxes_total.store(count, Ordering::Relaxed);
329 }
330
331 pub fn set_messages_total(&self, count: u64) {
332 self.messages_total.store(count, Ordering::Relaxed);
333 }
334
335 pub fn set_storage_bytes(&self, bytes: u64) {
336 self.storage_bytes.store(bytes, Ordering::Relaxed);
337 }
338
339 pub fn start_message_processing_timer(&self) -> Timer {
341 Timer::new(Arc::clone(&self.message_processing_latency))
342 }
343
344 pub fn start_smtp_session_timer(&self) -> Timer {
345 Timer::new(Arc::clone(&self.smtp_session_duration))
346 }
347
348 pub fn export_prometheus(&self) -> String {
350 let mut output = String::new();
351
352 output.push_str("# HELP rusmes_smtp_connections_total Total SMTP connections\n");
354 output.push_str("# TYPE rusmes_smtp_connections_total counter\n");
355 output.push_str(&format!(
356 "rusmes_smtp_connections_total {}\n",
357 self.smtp_connections_total.load(Ordering::Relaxed)
358 ));
359
360 output
361 .push_str("# HELP rusmes_smtp_messages_received_total Total SMTP messages received\n");
362 output.push_str("# TYPE rusmes_smtp_messages_received_total counter\n");
363 output.push_str(&format!(
364 "rusmes_smtp_messages_received_total {}\n",
365 self.smtp_messages_received.load(Ordering::Relaxed)
366 ));
367
368 output.push_str("# HELP rusmes_smtp_messages_sent_total Total SMTP messages sent\n");
369 output.push_str("# TYPE rusmes_smtp_messages_sent_total counter\n");
370 output.push_str(&format!(
371 "rusmes_smtp_messages_sent_total {}\n",
372 self.smtp_messages_sent.load(Ordering::Relaxed)
373 ));
374
375 output.push_str("# HELP rusmes_smtp_errors_total Total SMTP errors\n");
376 output.push_str("# TYPE rusmes_smtp_errors_total counter\n");
377 output.push_str(&format!(
378 "rusmes_smtp_errors_total {}\n",
379 self.smtp_errors.load(Ordering::Relaxed)
380 ));
381
382 output.push_str("# HELP rusmes_imap_connections_total Total IMAP connections\n");
384 output.push_str("# TYPE rusmes_imap_connections_total counter\n");
385 output.push_str(&format!(
386 "rusmes_imap_connections_total {}\n",
387 self.imap_connections_total.load(Ordering::Relaxed)
388 ));
389
390 output.push_str("# HELP rusmes_imap_commands_total Total IMAP commands\n");
391 output.push_str("# TYPE rusmes_imap_commands_total counter\n");
392 output.push_str(&format!(
393 "rusmes_imap_commands_total {}\n",
394 self.imap_commands_total.load(Ordering::Relaxed)
395 ));
396
397 output.push_str("# HELP rusmes_imap_errors_total Total IMAP errors\n");
398 output.push_str("# TYPE rusmes_imap_errors_total counter\n");
399 output.push_str(&format!(
400 "rusmes_imap_errors_total {}\n",
401 self.imap_errors.load(Ordering::Relaxed)
402 ));
403
404 output.push_str("# HELP rusmes_jmap_requests_total Total JMAP requests\n");
406 output.push_str("# TYPE rusmes_jmap_requests_total counter\n");
407 output.push_str(&format!(
408 "rusmes_jmap_requests_total {}\n",
409 self.jmap_requests_total.load(Ordering::Relaxed)
410 ));
411
412 output.push_str("# HELP rusmes_jmap_errors_total Total JMAP errors\n");
413 output.push_str("# TYPE rusmes_jmap_errors_total counter\n");
414 output.push_str(&format!(
415 "rusmes_jmap_errors_total {}\n",
416 self.jmap_errors.load(Ordering::Relaxed)
417 ));
418
419 output.push_str("# HELP rusmes_mail_processed_total Total mail processed\n");
421 output.push_str("# TYPE rusmes_mail_processed_total counter\n");
422 output.push_str(&format!(
423 "rusmes_mail_processed_total {}\n",
424 self.mail_processed_total.load(Ordering::Relaxed)
425 ));
426
427 output.push_str("# HELP rusmes_mail_delivered_total Total mail delivered\n");
428 output.push_str("# TYPE rusmes_mail_delivered_total counter\n");
429 output.push_str(&format!(
430 "rusmes_mail_delivered_total {}\n",
431 self.mail_delivered_total.load(Ordering::Relaxed)
432 ));
433
434 output.push_str("# HELP rusmes_mail_bounced_total Total mail bounced\n");
435 output.push_str("# TYPE rusmes_mail_bounced_total counter\n");
436 output.push_str(&format!(
437 "rusmes_mail_bounced_total {}\n",
438 self.mail_bounced_total.load(Ordering::Relaxed)
439 ));
440
441 output.push_str("# HELP rusmes_mail_dropped_total Total mail dropped\n");
442 output.push_str("# TYPE rusmes_mail_dropped_total counter\n");
443 output.push_str(&format!(
444 "rusmes_mail_dropped_total {}\n",
445 self.mail_dropped_total.load(Ordering::Relaxed)
446 ));
447
448 output.push_str("# HELP rusmes_queue_size Current queue size\n");
450 output.push_str("# TYPE rusmes_queue_size gauge\n");
451 output.push_str(&format!(
452 "rusmes_queue_size {}\n",
453 self.queue_size.load(Ordering::Relaxed)
454 ));
455
456 output.push_str("# HELP rusmes_queue_retries_total Total queue retries\n");
457 output.push_str("# TYPE rusmes_queue_retries_total counter\n");
458 output.push_str(&format!(
459 "rusmes_queue_retries_total {}\n",
460 self.queue_retries.load(Ordering::Relaxed)
461 ));
462
463 output.push_str("# HELP rusmes_mailboxes_total Total mailboxes\n");
465 output.push_str("# TYPE rusmes_mailboxes_total gauge\n");
466 output.push_str(&format!(
467 "rusmes_mailboxes_total {}\n",
468 self.mailboxes_total.load(Ordering::Relaxed)
469 ));
470
471 output.push_str("# HELP rusmes_messages_total Total messages\n");
472 output.push_str("# TYPE rusmes_messages_total gauge\n");
473 output.push_str(&format!(
474 "rusmes_messages_total {}\n",
475 self.messages_total.load(Ordering::Relaxed)
476 ));
477
478 output.push_str("# HELP rusmes_storage_bytes Total storage bytes\n");
479 output.push_str("# TYPE rusmes_storage_bytes gauge\n");
480 output.push_str(&format!(
481 "rusmes_storage_bytes {}\n",
482 self.storage_bytes.load(Ordering::Relaxed)
483 ));
484
485 output.push_str(&self.message_processing_latency.export(
487 "rusmes_message_processing_latency_seconds",
488 "Message processing latency in seconds",
489 ));
490
491 output.push_str(&self.smtp_session_duration.export(
492 "rusmes_smtp_session_duration_seconds",
493 "SMTP session duration in seconds",
494 ));
495
496 output
497 }
498
499 pub async fn start_http_server(self, config: MetricsConfig) -> anyhow::Result<()> {
501 if !config.enabled {
502 eprintln!("Metrics HTTP server is disabled");
503 return Ok(());
504 }
505
506 config.validate_bind_address()?;
507 config.validate_path()?;
508
509 let metrics = Arc::new(Mutex::new(self));
510 let metrics_path = config.path.clone();
511
512 let metrics_router = Router::new().route(
513 &metrics_path,
514 get({
515 let metrics = Arc::clone(&metrics);
516 move || {
517 let metrics = Arc::clone(&metrics);
518 async move {
519 let collector = match metrics.lock() {
520 Ok(guard) => guard,
521 Err(e) => {
522 eprintln!("Metrics mutex poisoned: {e}");
523 return axum::http::StatusCode::INTERNAL_SERVER_ERROR
524 .into_response();
525 }
526 };
527 let output = collector.export_prometheus();
528 (
529 [(
530 axum::http::header::CONTENT_TYPE,
531 "text/plain; version=0.0.4",
532 )],
533 output,
534 )
535 .into_response()
536 }
537 }
538 }),
539 );
540
541 let health_router = create_health_router();
542 let app = Router::new().merge(metrics_router).merge(health_router);
543
544 eprintln!(
545 "Starting metrics HTTP server on {}{}",
546 config.bind_address, metrics_path
547 );
548 eprintln!("Health check endpoints: /health, /ready, /live");
549
550 let listener = TcpListener::bind(&config.bind_address).await?;
551 axum::serve(listener, app).await?;
552
553 Ok(())
554 }
555}
556
557#[derive(Debug, Serialize, Clone)]
559pub struct HealthResponse {
560 pub status: String,
561 pub checks: HealthChecks,
562}
563
564#[derive(Debug, Serialize, Clone)]
566pub struct HealthChecks {
567 pub storage: String,
568 pub queue: String,
569}
570
571#[derive(Debug, Serialize, Clone)]
573pub struct ReadyResponse {
574 pub ready: bool,
575}
576
577#[derive(Debug, Serialize, Clone)]
579pub struct LiveResponse {
580 pub alive: bool,
581}
582
583async fn health_check() -> (StatusCode, Json<HealthResponse>) {
585 let storage_status = check_storage().await;
586 let queue_status = check_queue().await;
587
588 let all_healthy = storage_status == "healthy" && queue_status == "healthy";
589 let status_code = if all_healthy {
590 StatusCode::OK
591 } else {
592 StatusCode::SERVICE_UNAVAILABLE
593 };
594
595 let response = HealthResponse {
596 status: if all_healthy {
597 "healthy".to_string()
598 } else {
599 "unhealthy".to_string()
600 },
601 checks: HealthChecks {
602 storage: storage_status,
603 queue: queue_status,
604 },
605 };
606
607 (status_code, Json(response))
608}
609
610async fn readiness_check() -> (StatusCode, Json<ReadyResponse>) {
612 let ready = true;
613
614 let status_code = if ready {
615 StatusCode::OK
616 } else {
617 StatusCode::SERVICE_UNAVAILABLE
618 };
619
620 (status_code, Json(ReadyResponse { ready }))
621}
622
623async fn liveness_check() -> (StatusCode, Json<LiveResponse>) {
625 (StatusCode::OK, Json(LiveResponse { alive: true }))
626}
627
628async fn check_storage() -> String {
630 "healthy".to_string()
631}
632
633async fn check_queue() -> String {
635 "healthy".to_string()
636}
637
638pub fn create_health_router() -> Router {
640 Router::new()
641 .route("/health", get(health_check))
642 .route("/ready", get(readiness_check))
643 .route("/live", get(liveness_check))
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649
650 #[test]
651 fn test_metrics_collector() {
652 let metrics = MetricsCollector::new();
653
654 metrics.inc_smtp_connections();
655 metrics.inc_smtp_messages_received();
656 metrics.inc_mail_processed();
657 metrics.inc_mail_delivered();
658
659 assert_eq!(metrics.smtp_connections_total.load(Ordering::Relaxed), 1);
660 assert_eq!(metrics.smtp_messages_received.load(Ordering::Relaxed), 1);
661 assert_eq!(metrics.mail_processed_total.load(Ordering::Relaxed), 1);
662 assert_eq!(metrics.mail_delivered_total.load(Ordering::Relaxed), 1);
663 }
664
665 #[test]
666 fn test_prometheus_export() {
667 let metrics = MetricsCollector::new();
668 metrics.inc_smtp_connections();
669 metrics.set_queue_size(42);
670
671 let output = metrics.export_prometheus();
672
673 assert!(output.contains("rusmes_smtp_connections_total 1"));
674 assert!(output.contains("rusmes_queue_size 42"));
675 assert!(output.contains("# HELP"));
676 assert!(output.contains("# TYPE"));
677 }
678}