Skip to main content

dynamo_runtime/compute/
metrics.rs

1// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Metrics for monitoring compute pool operations
5
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::Duration;
8
9/// Metrics for the compute pool
10#[derive(Debug)]
11pub struct ComputeMetrics {
12    /// Total number of tasks executed
13    tasks_total: AtomicU64,
14
15    /// Number of tasks currently running
16    tasks_active: AtomicUsize,
17
18    /// Total time spent in compute tasks (microseconds)
19    total_compute_time_us: AtomicU64,
20
21    /// Maximum task duration seen (microseconds)
22    max_task_duration_us: AtomicU64,
23
24    /// Number of tasks that took longer than 100ms
25    slow_tasks: AtomicU64,
26}
27
28impl ComputeMetrics {
29    /// Create new metrics instance
30    pub fn new() -> Self {
31        Self {
32            tasks_total: AtomicU64::new(0),
33            tasks_active: AtomicUsize::new(0),
34            total_compute_time_us: AtomicU64::new(0),
35            max_task_duration_us: AtomicU64::new(0),
36            slow_tasks: AtomicU64::new(0),
37        }
38    }
39
40    /// Record that a task has started
41    pub fn record_task_start(&self) {
42        self.tasks_active.fetch_add(1, Ordering::Relaxed);
43    }
44
45    /// Record that a task has completed
46    pub fn record_task_completion(&self, duration: Duration) {
47        self.tasks_active.fetch_sub(1, Ordering::Relaxed);
48        self.tasks_total.fetch_add(1, Ordering::Relaxed);
49
50        // Use saturating conversion to prevent overflow
51        let duration_us = duration.as_micros().min(u64::MAX as u128) as u64;
52        self.total_compute_time_us
53            .fetch_add(duration_us, Ordering::Relaxed);
54
55        // Update max duration
56        let mut current_max = self.max_task_duration_us.load(Ordering::Relaxed);
57        while duration_us > current_max {
58            match self.max_task_duration_us.compare_exchange_weak(
59                current_max,
60                duration_us,
61                Ordering::SeqCst,
62                Ordering::Relaxed,
63            ) {
64                Ok(_) => break,
65                Err(x) => current_max = x,
66            }
67        }
68
69        // Track slow tasks (> 100ms)
70        if duration.as_millis() > 100 {
71            self.slow_tasks.fetch_add(1, Ordering::Relaxed);
72        }
73    }
74
75    /// Get total number of tasks executed
76    pub fn tasks_total(&self) -> u64 {
77        self.tasks_total.load(Ordering::Relaxed)
78    }
79
80    /// Get number of currently active tasks
81    pub fn tasks_active(&self) -> usize {
82        self.tasks_active.load(Ordering::Relaxed)
83    }
84
85    /// Get average task duration in microseconds
86    pub fn avg_task_duration_us(&self) -> f64 {
87        let total = self.tasks_total.load(Ordering::Relaxed);
88        if total == 0 {
89            return 0.0;
90        }
91
92        let total_time = self.total_compute_time_us.load(Ordering::Relaxed);
93        total_time as f64 / total as f64
94    }
95
96    /// Get maximum task duration in microseconds
97    pub fn max_task_duration_us(&self) -> u64 {
98        self.max_task_duration_us.load(Ordering::Relaxed)
99    }
100
101    /// Get number of slow tasks (> 100ms)
102    pub fn slow_tasks(&self) -> u64 {
103        self.slow_tasks.load(Ordering::Relaxed)
104    }
105
106    /// Reset all metrics
107    pub fn reset(&self) {
108        self.tasks_total.store(0, Ordering::Relaxed);
109        self.tasks_active.store(0, Ordering::Relaxed);
110        self.total_compute_time_us.store(0, Ordering::Relaxed);
111        self.max_task_duration_us.store(0, Ordering::Relaxed);
112        self.slow_tasks.store(0, Ordering::Relaxed);
113    }
114}
115
116impl Default for ComputeMetrics {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122/// Format metrics as a human-readable string
123impl std::fmt::Display for ComputeMetrics {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        write!(
126            f,
127            "ComputeMetrics {{ tasks_total: {}, tasks_active: {}, avg_duration_ms: {:.2}, max_duration_ms: {:.2}, slow_tasks: {} }}",
128            self.tasks_total(),
129            self.tasks_active(),
130            self.avg_task_duration_us() / 1000.0,
131            self.max_task_duration_us() as f64 / 1000.0,
132            self.slow_tasks(),
133        )
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn test_metrics_recording() {
143        let metrics = ComputeMetrics::new();
144
145        assert_eq!(metrics.tasks_total(), 0);
146        assert_eq!(metrics.tasks_active(), 0);
147
148        metrics.record_task_start();
149        assert_eq!(metrics.tasks_active(), 1);
150
151        metrics.record_task_completion(Duration::from_millis(50));
152        assert_eq!(metrics.tasks_active(), 0);
153        assert_eq!(metrics.tasks_total(), 1);
154        assert_eq!(metrics.slow_tasks(), 0);
155
156        metrics.record_task_start();
157        metrics.record_task_completion(Duration::from_millis(150));
158        assert_eq!(metrics.tasks_total(), 2);
159        assert_eq!(metrics.slow_tasks(), 1);
160    }
161
162    #[test]
163    fn test_metrics_reset() {
164        let metrics = ComputeMetrics::new();
165
166        metrics.record_task_start();
167        metrics.record_task_completion(Duration::from_millis(50));
168        assert_eq!(metrics.tasks_total(), 1);
169
170        metrics.reset();
171        assert_eq!(metrics.tasks_total(), 0);
172        assert_eq!(metrics.tasks_active(), 0);
173    }
174}