1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::sync::atomic::{AtomicU64, Ordering};
4
5#[derive(Debug, Default)]
6pub struct TaskMetrics {
7 tasks_queued: AtomicU64,
8 tasks_completed: AtomicU64,
9 tasks_failed: AtomicU64,
10 tasks_retried: AtomicU64,
11 queue_depth: AtomicU64,
12 total_processing_time_ms: AtomicU64,
13 peak_queue_depth: AtomicU64,
14 reset_at: AtomicU64, }
16
17impl TaskMetrics {
18 pub fn new() -> Self {
19 Self {
20 reset_at: AtomicU64::new(chrono::Utc::now().timestamp() as u64),
21 ..Default::default()
22 }
23 }
24
25 pub fn record_queued(&self) {
26 self.tasks_queued.fetch_add(1, Ordering::Relaxed);
27 let new_depth = self.queue_depth.fetch_add(1, Ordering::Relaxed) + 1;
28 self.update_peak_depth(new_depth);
29 }
30
31 pub fn record_completed(&self) {
32 self.tasks_completed.fetch_add(1, Ordering::Relaxed);
33 self.queue_depth.fetch_sub(1, Ordering::Relaxed);
34 }
35
36 pub fn record_failed(&self) {
37 self.tasks_failed.fetch_add(1, Ordering::Relaxed);
38 self.queue_depth.fetch_sub(1, Ordering::Relaxed);
39 }
40
41 pub fn record_retried(&self) {
42 self.tasks_retried.fetch_add(1, Ordering::Relaxed);
43 }
45
46 pub fn record_processing_time(&self, duration_ms: u64) {
47 self.total_processing_time_ms
48 .fetch_add(duration_ms, Ordering::Relaxed);
49 }
50
51 pub fn get_queue_depth(&self) -> u64 {
52 self.queue_depth.load(Ordering::Relaxed)
53 }
54
55 pub fn snapshot(&self) -> MetricsSnapshot {
56 let queued = self.tasks_queued.load(Ordering::Relaxed);
57 let completed = self.tasks_completed.load(Ordering::Relaxed);
58 let failed = self.tasks_failed.load(Ordering::Relaxed);
59 let retried = self.tasks_retried.load(Ordering::Relaxed);
60 let queue_depth = self.queue_depth.load(Ordering::Relaxed);
61 let total_time = self.total_processing_time_ms.load(Ordering::Relaxed);
62 let peak_depth = self.peak_queue_depth.load(Ordering::Relaxed);
63 let reset_timestamp = self.reset_at.load(Ordering::Relaxed);
64
65 let processed = completed + failed;
66 let success_rate = if processed > 0 {
67 (completed as f64 / processed as f64) * 100.0
68 } else {
69 0.0
70 };
71
72 let avg_processing_time = if completed > 0 {
73 total_time as f64 / completed as f64
74 } else {
75 0.0
76 };
77
78 MetricsSnapshot {
79 tasks_queued: queued,
80 tasks_completed: completed,
81 tasks_failed: failed,
82 tasks_retried: retried,
83 tasks_in_progress: queue_depth,
84 peak_queue_depth: peak_depth,
85 success_rate_percent: success_rate,
86 average_processing_time_ms: avg_processing_time,
87 total_processing_time_ms: total_time,
88 uptime_seconds: chrono::Utc::now().timestamp() as u64 - reset_timestamp,
89 timestamp: Utc::now(),
90 }
91 }
92
93 pub fn reset(&self) {
94 self.tasks_queued.store(0, Ordering::Relaxed);
95 self.tasks_completed.store(0, Ordering::Relaxed);
96 self.tasks_failed.store(0, Ordering::Relaxed);
97 self.tasks_retried.store(0, Ordering::Relaxed);
98 self.total_processing_time_ms.store(0, Ordering::Relaxed);
99 self.peak_queue_depth
100 .store(self.queue_depth.load(Ordering::Relaxed), Ordering::Relaxed);
101 self.reset_at
102 .store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
103 }
104
105 fn update_peak_depth(&self, current_depth: u64) {
106 self.peak_queue_depth
107 .fetch_max(current_depth, Ordering::Relaxed);
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct MetricsSnapshot {
113 pub tasks_queued: u64,
114 pub tasks_completed: u64,
115 pub tasks_failed: u64,
116 pub tasks_retried: u64,
117 pub tasks_in_progress: u64,
118 pub peak_queue_depth: u64,
119 pub success_rate_percent: f64,
120 pub average_processing_time_ms: f64,
121 pub total_processing_time_ms: u64,
122 pub uptime_seconds: u64,
123 pub timestamp: DateTime<Utc>,
124}
125
126impl MetricsSnapshot {
127 pub fn throughput_per_second(&self) -> f64 {
128 if self.uptime_seconds > 0 {
129 (self.tasks_completed + self.tasks_failed) as f64 / self.uptime_seconds as f64
130 } else {
131 0.0
132 }
133 }
134
135 pub fn failure_rate_percent(&self) -> f64 {
136 100.0 - self.success_rate_percent
137 }
138
139 pub fn is_healthy(&self) -> bool {
140 self.success_rate_percent >= 95.0
141 && self.tasks_in_progress < (crate::types::MAX_QUEUE_SIZE as u64 / 2)
142 }
143}