axum_tasks/
metrics.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicU64, Ordering};
4
5#[derive(Debug, Default)]
6pub struct TaskMetrics {
7    tasks_queued: AtomicU64,
8    tasks_completed: AtomicU64,
9    tasks_failed: AtomicU64,
10    tasks_retried: AtomicU64,
11    queue_depth: AtomicU64,
12    total_processing_time_ms: AtomicU64,
13    peak_queue_depth: AtomicU64,
14    reset_at: AtomicU64, // Unix timestamp
15}
16
17impl TaskMetrics {
18    pub fn new() -> Self {
19        Self {
20            reset_at: AtomicU64::new(chrono::Utc::now().timestamp() as u64),
21            ..Default::default()
22        }
23    }
24
25    pub fn record_queued(&self) {
26        self.tasks_queued.fetch_add(1, Ordering::Relaxed);
27        let new_depth = self.queue_depth.fetch_add(1, Ordering::Relaxed) + 1;
28        self.update_peak_depth(new_depth);
29    }
30
31    pub fn record_completed(&self) {
32        self.tasks_completed.fetch_add(1, Ordering::Relaxed);
33        self.queue_depth.fetch_sub(1, Ordering::Relaxed);
34    }
35
36    pub fn record_failed(&self) {
37        self.tasks_failed.fetch_add(1, Ordering::Relaxed);
38        self.queue_depth.fetch_sub(1, Ordering::Relaxed);
39    }
40
41    pub fn record_retried(&self) {
42        self.tasks_retried.fetch_add(1, Ordering::Relaxed);
43        // Don't change queue depth for retries
44    }
45
46    pub fn record_processing_time(&self, duration_ms: u64) {
47        self.total_processing_time_ms
48            .fetch_add(duration_ms, Ordering::Relaxed);
49    }
50
51    pub fn get_queue_depth(&self) -> u64 {
52        self.queue_depth.load(Ordering::Relaxed)
53    }
54
55    pub fn snapshot(&self) -> MetricsSnapshot {
56        let queued = self.tasks_queued.load(Ordering::Relaxed);
57        let completed = self.tasks_completed.load(Ordering::Relaxed);
58        let failed = self.tasks_failed.load(Ordering::Relaxed);
59        let retried = self.tasks_retried.load(Ordering::Relaxed);
60        let queue_depth = self.queue_depth.load(Ordering::Relaxed);
61        let total_time = self.total_processing_time_ms.load(Ordering::Relaxed);
62        let peak_depth = self.peak_queue_depth.load(Ordering::Relaxed);
63        let reset_timestamp = self.reset_at.load(Ordering::Relaxed);
64
65        let processed = completed + failed;
66        let success_rate = if processed > 0 {
67            (completed as f64 / processed as f64) * 100.0
68        } else {
69            0.0
70        };
71
72        let avg_processing_time = if completed > 0 {
73            total_time as f64 / completed as f64
74        } else {
75            0.0
76        };
77
78        MetricsSnapshot {
79            tasks_queued: queued,
80            tasks_completed: completed,
81            tasks_failed: failed,
82            tasks_retried: retried,
83            tasks_in_progress: queue_depth,
84            peak_queue_depth: peak_depth,
85            success_rate_percent: success_rate,
86            average_processing_time_ms: avg_processing_time,
87            total_processing_time_ms: total_time,
88            uptime_seconds: chrono::Utc::now().timestamp() as u64 - reset_timestamp,
89            timestamp: Utc::now(),
90        }
91    }
92
93    pub fn reset(&self) {
94        self.tasks_queued.store(0, Ordering::Relaxed);
95        self.tasks_completed.store(0, Ordering::Relaxed);
96        self.tasks_failed.store(0, Ordering::Relaxed);
97        self.tasks_retried.store(0, Ordering::Relaxed);
98        self.total_processing_time_ms.store(0, Ordering::Relaxed);
99        self.peak_queue_depth
100            .store(self.queue_depth.load(Ordering::Relaxed), Ordering::Relaxed);
101        self.reset_at
102            .store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
103    }
104
105    fn update_peak_depth(&self, current_depth: u64) {
106        self.peak_queue_depth
107            .fetch_max(current_depth, Ordering::Relaxed);
108    }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct MetricsSnapshot {
113    pub tasks_queued: u64,
114    pub tasks_completed: u64,
115    pub tasks_failed: u64,
116    pub tasks_retried: u64,
117    pub tasks_in_progress: u64,
118    pub peak_queue_depth: u64,
119    pub success_rate_percent: f64,
120    pub average_processing_time_ms: f64,
121    pub total_processing_time_ms: u64,
122    pub uptime_seconds: u64,
123    pub timestamp: DateTime<Utc>,
124}
125
126impl MetricsSnapshot {
127    pub fn throughput_per_second(&self) -> f64 {
128        if self.uptime_seconds > 0 {
129            (self.tasks_completed + self.tasks_failed) as f64 / self.uptime_seconds as f64
130        } else {
131            0.0
132        }
133    }
134
135    pub fn failure_rate_percent(&self) -> f64 {
136        100.0 - self.success_rate_percent
137    }
138
139    pub fn is_healthy(&self) -> bool {
140        self.success_rate_percent >= 95.0
141            && self.tasks_in_progress < (crate::types::MAX_QUEUE_SIZE as u64 / 2)
142    }
143}