dynamo_runtime/compute/
metrics.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9#[derive(Debug)]
11pub struct ComputeMetrics {
12 tasks_total: AtomicU64,
14
15 tasks_active: AtomicUsize,
17
18 total_compute_time_us: AtomicU64,
20
21 max_task_duration_us: AtomicU64,
23
24 slow_tasks: AtomicU64,
26}
27
28impl ComputeMetrics {
29 pub fn new() -> Self {
31 Self {
32 tasks_total: AtomicU64::new(0),
33 tasks_active: AtomicUsize::new(0),
34 total_compute_time_us: AtomicU64::new(0),
35 max_task_duration_us: AtomicU64::new(0),
36 slow_tasks: AtomicU64::new(0),
37 }
38 }
39
40 pub fn record_task_start(&self) {
42 self.tasks_active.fetch_add(1, Ordering::Relaxed);
43 }
44
45 pub fn record_task_completion(&self, duration: Duration) {
47 self.tasks_active.fetch_sub(1, Ordering::Relaxed);
48 self.tasks_total.fetch_add(1, Ordering::Relaxed);
49
50 let duration_us = duration.as_micros().min(u64::MAX as u128) as u64;
52 self.total_compute_time_us
53 .fetch_add(duration_us, Ordering::Relaxed);
54
55 let mut current_max = self.max_task_duration_us.load(Ordering::Relaxed);
57 while duration_us > current_max {
58 match self.max_task_duration_us.compare_exchange_weak(
59 current_max,
60 duration_us,
61 Ordering::SeqCst,
62 Ordering::Relaxed,
63 ) {
64 Ok(_) => break,
65 Err(x) => current_max = x,
66 }
67 }
68
69 if duration.as_millis() > 100 {
71 self.slow_tasks.fetch_add(1, Ordering::Relaxed);
72 }
73 }
74
75 pub fn tasks_total(&self) -> u64 {
77 self.tasks_total.load(Ordering::Relaxed)
78 }
79
80 pub fn tasks_active(&self) -> usize {
82 self.tasks_active.load(Ordering::Relaxed)
83 }
84
85 pub fn avg_task_duration_us(&self) -> f64 {
87 let total = self.tasks_total.load(Ordering::Relaxed);
88 if total == 0 {
89 return 0.0;
90 }
91
92 let total_time = self.total_compute_time_us.load(Ordering::Relaxed);
93 total_time as f64 / total as f64
94 }
95
96 pub fn max_task_duration_us(&self) -> u64 {
98 self.max_task_duration_us.load(Ordering::Relaxed)
99 }
100
101 pub fn slow_tasks(&self) -> u64 {
103 self.slow_tasks.load(Ordering::Relaxed)
104 }
105
106 pub fn reset(&self) {
108 self.tasks_total.store(0, Ordering::Relaxed);
109 self.tasks_active.store(0, Ordering::Relaxed);
110 self.total_compute_time_us.store(0, Ordering::Relaxed);
111 self.max_task_duration_us.store(0, Ordering::Relaxed);
112 self.slow_tasks.store(0, Ordering::Relaxed);
113 }
114}
115
116impl Default for ComputeMetrics {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122impl std::fmt::Display for ComputeMetrics {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(
126 f,
127 "ComputeMetrics {{ tasks_total: {}, tasks_active: {}, avg_duration_ms: {:.2}, max_duration_ms: {:.2}, slow_tasks: {} }}",
128 self.tasks_total(),
129 self.tasks_active(),
130 self.avg_task_duration_us() / 1000.0,
131 self.max_task_duration_us() as f64 / 1000.0,
132 self.slow_tasks(),
133 )
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn test_metrics_recording() {
143 let metrics = ComputeMetrics::new();
144
145 assert_eq!(metrics.tasks_total(), 0);
146 assert_eq!(metrics.tasks_active(), 0);
147
148 metrics.record_task_start();
149 assert_eq!(metrics.tasks_active(), 1);
150
151 metrics.record_task_completion(Duration::from_millis(50));
152 assert_eq!(metrics.tasks_active(), 0);
153 assert_eq!(metrics.tasks_total(), 1);
154 assert_eq!(metrics.slow_tasks(), 0);
155
156 metrics.record_task_start();
157 metrics.record_task_completion(Duration::from_millis(150));
158 assert_eq!(metrics.tasks_total(), 2);
159 assert_eq!(metrics.slow_tasks(), 1);
160 }
161
162 #[test]
163 fn test_metrics_reset() {
164 let metrics = ComputeMetrics::new();
165
166 metrics.record_task_start();
167 metrics.record_task_completion(Duration::from_millis(50));
168 assert_eq!(metrics.tasks_total(), 1);
169
170 metrics.reset();
171 assert_eq!(metrics.tasks_total(), 0);
172 assert_eq!(metrics.tasks_active(), 0);
173 }
174}