cuenv_ci/executor/
metrics.rs

1//! Cache Metrics
2//!
3//! Provides metrics collection for cache operations and task execution.
4//! Metrics are exposed in formats compatible with Prometheus and OpenTelemetry.
5
6use crate::ir::CachePolicy;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10
11/// Cache metrics collector
12#[derive(Debug, Default)]
13pub struct CacheMetrics {
14    /// Total cache hits by policy
15    hits: PolicyCounters,
16    /// Total cache misses by policy
17    misses: PolicyCounters,
18    /// Total cache restore failures by error type
19    restore_failures: ErrorCounters,
20    /// Total bytes downloaded from cache
21    bytes_downloaded: AtomicU64,
22    /// Total bytes uploaded to cache
23    bytes_uploaded: AtomicU64,
24    /// Total cache check latency (microseconds)
25    check_latency_us: AtomicU64,
26    /// Total cache checks
27    check_count: AtomicU64,
28    /// Task execution durations (microseconds)
29    task_durations: TaskDurations,
30    /// Runtime materialization durations (microseconds)
31    runtime_durations: RuntimeDurations,
32}
33
34/// Task execution duration tracking
35#[derive(Debug, Default)]
36struct TaskDurations {
37    /// Total execution time across all tasks (microseconds)
38    total_us: AtomicU64,
39    /// Number of tasks executed
40    count: AtomicU64,
41    /// Per-task durations (`task_id` -> `duration_us`)
42    per_task: RwLock<HashMap<String, u64>>,
43}
44
45/// Runtime materialization duration tracking
46#[derive(Debug, Default)]
47struct RuntimeDurations {
48    /// Total materialization time (microseconds)
49    total_us: AtomicU64,
50    /// Number of runtimes materialized
51    count: AtomicU64,
52    /// Per-runtime durations (`runtime_id` -> `duration_us`)
53    per_runtime: RwLock<HashMap<String, u64>>,
54}
55
56/// Counters by cache policy
57#[derive(Debug, Default)]
58struct PolicyCounters {
59    normal: AtomicU64,
60    readonly: AtomicU64,
61    writeonly: AtomicU64,
62    disabled: AtomicU64,
63}
64
65/// Counters by error type
66#[derive(Debug, Default)]
67struct ErrorCounters {
68    connection: AtomicU64,
69    timeout: AtomicU64,
70    not_found: AtomicU64,
71    digest_mismatch: AtomicU64,
72    other: AtomicU64,
73}
74
75impl CacheMetrics {
76    /// Create new metrics collector
77    #[must_use]
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    /// Record a cache hit
83    pub fn record_hit(&self, policy: CachePolicy, task_id: &str) {
84        self.hits.increment(policy);
85        tracing::debug!(
86            task = %task_id,
87            policy = ?policy,
88            metric = "cuenv_cache_hit_total",
89            "Cache hit recorded"
90        );
91    }
92
93    /// Record a cache miss
94    pub fn record_miss(&self, policy: CachePolicy, task_id: &str) {
95        self.misses.increment(policy);
96        tracing::debug!(
97            task = %task_id,
98            policy = ?policy,
99            metric = "cuenv_cache_miss_total",
100            "Cache miss recorded"
101        );
102    }
103
104    /// Record a cache restore failure
105    pub fn record_restore_failure(&self, error_type: RestoreErrorType, task_id: &str) {
106        self.restore_failures.increment(error_type);
107        tracing::debug!(
108            task = %task_id,
109            error_type = ?error_type,
110            metric = "cuenv_cache_restore_failure_total",
111            "Cache restore failure recorded"
112        );
113    }
114
115    /// Record bytes downloaded
116    pub fn record_download(&self, bytes: u64) {
117        self.bytes_downloaded.fetch_add(bytes, Ordering::Relaxed);
118    }
119
120    /// Record bytes uploaded
121    pub fn record_upload(&self, bytes: u64) {
122        self.bytes_uploaded.fetch_add(bytes, Ordering::Relaxed);
123    }
124
125    /// Record cache check latency
126    pub fn record_check_latency(&self, latency_us: u64) {
127        self.check_latency_us
128            .fetch_add(latency_us, Ordering::Relaxed);
129        self.check_count.fetch_add(1, Ordering::Relaxed);
130    }
131
132    /// Record task execution duration
133    pub fn record_task_duration(&self, task_id: &str, millis: u64) {
134        let micros = millis * 1000;
135        self.task_durations
136            .total_us
137            .fetch_add(micros, Ordering::Relaxed);
138        self.task_durations.count.fetch_add(1, Ordering::Relaxed);
139
140        if let Ok(mut map) = self.task_durations.per_task.write() {
141            map.insert(task_id.to_string(), micros);
142        }
143
144        tracing::debug!(
145            task = %task_id,
146            duration_ms = millis,
147            metric = "cuenv_task_duration_seconds",
148            "Task duration recorded"
149        );
150    }
151
152    /// Record runtime materialization duration
153    pub fn record_runtime_materialization(&self, runtime_id: &str, millis: u64) {
154        let micros = millis * 1000;
155        self.runtime_durations
156            .total_us
157            .fetch_add(micros, Ordering::Relaxed);
158        self.runtime_durations.count.fetch_add(1, Ordering::Relaxed);
159
160        if let Ok(mut map) = self.runtime_durations.per_runtime.write() {
161            map.insert(runtime_id.to_string(), micros);
162        }
163
164        tracing::debug!(
165            runtime = %runtime_id,
166            duration_ms = millis,
167            metric = "cuenv_runtime_materialization_seconds",
168            "Runtime materialization recorded"
169        );
170    }
171
172    /// Get total task execution time in milliseconds
173    #[must_use]
174    pub fn total_task_time_ms(&self) -> u64 {
175        self.task_durations.total_us.load(Ordering::Relaxed) / 1000
176    }
177
178    /// Get number of tasks executed
179    #[must_use]
180    pub fn task_count(&self) -> u64 {
181        self.task_durations.count.load(Ordering::Relaxed)
182    }
183
184    /// Get average task duration in milliseconds
185    #[must_use]
186    pub fn avg_task_duration_ms(&self) -> u64 {
187        let count = self.task_durations.count.load(Ordering::Relaxed);
188        if count == 0 {
189            return 0;
190        }
191        self.task_durations.total_us.load(Ordering::Relaxed) / count / 1000
192    }
193
194    /// Get total runtime materialization time in milliseconds
195    #[must_use]
196    pub fn total_runtime_time_ms(&self) -> u64 {
197        self.runtime_durations.total_us.load(Ordering::Relaxed) / 1000
198    }
199
200    /// Get number of runtimes materialized
201    #[must_use]
202    pub fn runtime_count(&self) -> u64 {
203        self.runtime_durations.count.load(Ordering::Relaxed)
204    }
205
206    /// Get total hits for a policy
207    #[must_use]
208    pub fn hits(&self, policy: CachePolicy) -> u64 {
209        self.hits.get(policy)
210    }
211
212    /// Get total misses for a policy
213    #[must_use]
214    pub fn misses(&self, policy: CachePolicy) -> u64 {
215        self.misses.get(policy)
216    }
217
218    /// Get total restore failures for an error type
219    #[must_use]
220    pub fn restore_failures(&self, error_type: RestoreErrorType) -> u64 {
221        self.restore_failures.get(error_type)
222    }
223
224    /// Get total bytes downloaded
225    #[must_use]
226    pub fn bytes_downloaded(&self) -> u64 {
227        self.bytes_downloaded.load(Ordering::Relaxed)
228    }
229
230    /// Get total bytes uploaded
231    #[must_use]
232    pub fn bytes_uploaded(&self) -> u64 {
233        self.bytes_uploaded.load(Ordering::Relaxed)
234    }
235
236    /// Get average check latency in microseconds
237    #[must_use]
238    pub fn avg_check_latency_us(&self) -> u64 {
239        let count = self.check_count.load(Ordering::Relaxed);
240        if count == 0 {
241            return 0;
242        }
243        self.check_latency_us.load(Ordering::Relaxed) / count
244    }
245
246    /// Calculate cache hit rate (0.0 - 1.0)
247    #[must_use]
248    #[allow(clippy::cast_precision_loss)] // Precision loss acceptable for hit rate calculation
249    pub fn hit_rate(&self) -> f64 {
250        let total_hits: u64 = [
251            CachePolicy::Normal,
252            CachePolicy::Readonly,
253            CachePolicy::Writeonly,
254            CachePolicy::Disabled,
255        ]
256        .iter()
257        .map(|p| self.hits(*p))
258        .sum();
259
260        let total_misses: u64 = [
261            CachePolicy::Normal,
262            CachePolicy::Readonly,
263            CachePolicy::Writeonly,
264            CachePolicy::Disabled,
265        ]
266        .iter()
267        .map(|p| self.misses(*p))
268        .sum();
269
270        let total = total_hits + total_misses;
271        if total == 0 {
272            return 0.0;
273        }
274        total_hits as f64 / total as f64
275    }
276
277    /// Export metrics in Prometheus format
278    #[must_use]
279    #[allow(clippy::cast_precision_loss)] // Precision loss acceptable for metrics export
280    #[allow(clippy::too_many_lines)] // Prometheus format requires many lines
281    pub fn to_prometheus(&self) -> String {
282        use std::fmt::Write;
283
284        let mut output = String::new();
285
286        // Cache hits
287        output.push_str("# HELP cuenv_cache_hit_total Total number of cache hits\n");
288        output.push_str("# TYPE cuenv_cache_hit_total counter\n");
289        for policy in &["normal", "readonly", "writeonly", "disabled"] {
290            let count = match *policy {
291                "normal" => self.hits.normal.load(Ordering::Relaxed),
292                "readonly" => self.hits.readonly.load(Ordering::Relaxed),
293                "writeonly" => self.hits.writeonly.load(Ordering::Relaxed),
294                "disabled" => self.hits.disabled.load(Ordering::Relaxed),
295                _ => 0,
296            };
297            let _ = writeln!(
298                output,
299                "cuenv_cache_hit_total{{policy=\"{policy}\"}} {count}"
300            );
301        }
302
303        // Cache misses
304        output.push_str("# HELP cuenv_cache_miss_total Total number of cache misses\n");
305        output.push_str("# TYPE cuenv_cache_miss_total counter\n");
306        for policy in &["normal", "readonly", "writeonly", "disabled"] {
307            let count = match *policy {
308                "normal" => self.misses.normal.load(Ordering::Relaxed),
309                "readonly" => self.misses.readonly.load(Ordering::Relaxed),
310                "writeonly" => self.misses.writeonly.load(Ordering::Relaxed),
311                "disabled" => self.misses.disabled.load(Ordering::Relaxed),
312                _ => 0,
313            };
314            let _ = writeln!(
315                output,
316                "cuenv_cache_miss_total{{policy=\"{policy}\"}} {count}"
317            );
318        }
319
320        // Restore failures
321        output.push_str(
322            "# HELP cuenv_cache_restore_failure_total Total number of cache restore failures\n",
323        );
324        output.push_str("# TYPE cuenv_cache_restore_failure_total counter\n");
325        for error_type in &[
326            "connection",
327            "timeout",
328            "not_found",
329            "digest_mismatch",
330            "other",
331        ] {
332            let count = match *error_type {
333                "connection" => self.restore_failures.connection.load(Ordering::Relaxed),
334                "timeout" => self.restore_failures.timeout.load(Ordering::Relaxed),
335                "not_found" => self.restore_failures.not_found.load(Ordering::Relaxed),
336                "digest_mismatch" => self
337                    .restore_failures
338                    .digest_mismatch
339                    .load(Ordering::Relaxed),
340                "other" => self.restore_failures.other.load(Ordering::Relaxed),
341                _ => 0,
342            };
343            let _ = writeln!(
344                output,
345                "cuenv_cache_restore_failure_total{{error_type=\"{error_type}\"}} {count}"
346            );
347        }
348
349        // Bytes transferred
350        output.push_str(
351            "# HELP cuenv_cache_bytes_downloaded_total Total bytes downloaded from cache\n",
352        );
353        output.push_str("# TYPE cuenv_cache_bytes_downloaded_total counter\n");
354        let _ = writeln!(
355            output,
356            "cuenv_cache_bytes_downloaded_total {}",
357            self.bytes_downloaded.load(Ordering::Relaxed)
358        );
359
360        output.push_str("# HELP cuenv_cache_bytes_uploaded_total Total bytes uploaded to cache\n");
361        output.push_str("# TYPE cuenv_cache_bytes_uploaded_total counter\n");
362        let _ = writeln!(
363            output,
364            "cuenv_cache_bytes_uploaded_total {}",
365            self.bytes_uploaded.load(Ordering::Relaxed)
366        );
367
368        // Task execution metrics
369        output.push_str(
370            "# HELP cuenv_task_duration_seconds_total Total task execution time in seconds\n",
371        );
372        output.push_str("# TYPE cuenv_task_duration_seconds_total counter\n");
373        let task_total_secs =
374            self.task_durations.total_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
375        let _ = writeln!(
376            output,
377            "cuenv_task_duration_seconds_total {task_total_secs:.3}"
378        );
379
380        output.push_str("# HELP cuenv_tasks_executed_total Total number of tasks executed\n");
381        output.push_str("# TYPE cuenv_tasks_executed_total counter\n");
382        let _ = writeln!(
383            output,
384            "cuenv_tasks_executed_total {}",
385            self.task_durations.count.load(Ordering::Relaxed)
386        );
387
388        // Runtime materialization metrics
389        output.push_str("# HELP cuenv_runtime_materialization_seconds_total Total runtime materialization time in seconds\n");
390        output.push_str("# TYPE cuenv_runtime_materialization_seconds_total counter\n");
391        let runtime_total_secs =
392            self.runtime_durations.total_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
393        let _ = writeln!(
394            output,
395            "cuenv_runtime_materialization_seconds_total {runtime_total_secs:.3}"
396        );
397
398        output.push_str(
399            "# HELP cuenv_runtimes_materialized_total Total number of runtimes materialized\n",
400        );
401        output.push_str("# TYPE cuenv_runtimes_materialized_total counter\n");
402        let _ = writeln!(
403            output,
404            "cuenv_runtimes_materialized_total {}",
405            self.runtime_durations.count.load(Ordering::Relaxed)
406        );
407
408        output
409    }
410}
411
412impl PolicyCounters {
413    fn increment(&self, policy: CachePolicy) {
414        match policy {
415            CachePolicy::Normal => self.normal.fetch_add(1, Ordering::Relaxed),
416            CachePolicy::Readonly => self.readonly.fetch_add(1, Ordering::Relaxed),
417            CachePolicy::Writeonly => self.writeonly.fetch_add(1, Ordering::Relaxed),
418            CachePolicy::Disabled => self.disabled.fetch_add(1, Ordering::Relaxed),
419        };
420    }
421
422    fn get(&self, policy: CachePolicy) -> u64 {
423        match policy {
424            CachePolicy::Normal => self.normal.load(Ordering::Relaxed),
425            CachePolicy::Readonly => self.readonly.load(Ordering::Relaxed),
426            CachePolicy::Writeonly => self.writeonly.load(Ordering::Relaxed),
427            CachePolicy::Disabled => self.disabled.load(Ordering::Relaxed),
428        }
429    }
430}
431
432/// Error types for restore failures
433#[derive(Debug, Clone, Copy, PartialEq, Eq)]
434pub enum RestoreErrorType {
435    /// Connection error
436    Connection,
437    /// Timeout
438    Timeout,
439    /// Blob not found
440    NotFound,
441    /// Digest mismatch
442    DigestMismatch,
443    /// Other error
444    Other,
445}
446
447impl ErrorCounters {
448    fn increment(&self, error_type: RestoreErrorType) {
449        match error_type {
450            RestoreErrorType::Connection => self.connection.fetch_add(1, Ordering::Relaxed),
451            RestoreErrorType::Timeout => self.timeout.fetch_add(1, Ordering::Relaxed),
452            RestoreErrorType::NotFound => self.not_found.fetch_add(1, Ordering::Relaxed),
453            RestoreErrorType::DigestMismatch => {
454                self.digest_mismatch.fetch_add(1, Ordering::Relaxed)
455            }
456            RestoreErrorType::Other => self.other.fetch_add(1, Ordering::Relaxed),
457        };
458    }
459
460    fn get(&self, error_type: RestoreErrorType) -> u64 {
461        match error_type {
462            RestoreErrorType::Connection => self.connection.load(Ordering::Relaxed),
463            RestoreErrorType::Timeout => self.timeout.load(Ordering::Relaxed),
464            RestoreErrorType::NotFound => self.not_found.load(Ordering::Relaxed),
465            RestoreErrorType::DigestMismatch => self.digest_mismatch.load(Ordering::Relaxed),
466            RestoreErrorType::Other => self.other.load(Ordering::Relaxed),
467        }
468    }
469}
470
471/// Global metrics instance
472static GLOBAL_METRICS: std::sync::OnceLock<Arc<CacheMetrics>> = std::sync::OnceLock::new();
473
474/// Get or initialize global cache metrics
475#[must_use]
476pub fn global_metrics() -> Arc<CacheMetrics> {
477    GLOBAL_METRICS
478        .get_or_init(|| Arc::new(CacheMetrics::new()))
479        .clone()
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn test_record_hit() {
488        let metrics = CacheMetrics::new();
489        metrics.record_hit(CachePolicy::Normal, "test-task");
490        assert_eq!(metrics.hits(CachePolicy::Normal), 1);
491        assert_eq!(metrics.hits(CachePolicy::Readonly), 0);
492    }
493
494    #[test]
495    fn test_record_miss() {
496        let metrics = CacheMetrics::new();
497        metrics.record_miss(CachePolicy::Readonly, "test-task");
498        assert_eq!(metrics.misses(CachePolicy::Readonly), 1);
499        assert_eq!(metrics.misses(CachePolicy::Normal), 0);
500    }
501
502    #[test]
503    fn test_record_restore_failure() {
504        let metrics = CacheMetrics::new();
505        metrics.record_restore_failure(RestoreErrorType::Connection, "test-task");
506        assert_eq!(metrics.restore_failures(RestoreErrorType::Connection), 1);
507        assert_eq!(metrics.restore_failures(RestoreErrorType::Timeout), 0);
508    }
509
510    #[test]
511    fn test_hit_rate() {
512        let metrics = CacheMetrics::new();
513        metrics.record_hit(CachePolicy::Normal, "t1");
514        metrics.record_hit(CachePolicy::Normal, "t2");
515        metrics.record_hit(CachePolicy::Normal, "t3");
516        metrics.record_miss(CachePolicy::Normal, "t4");
517
518        let rate = metrics.hit_rate();
519        assert!((rate - 0.75).abs() < 0.001);
520    }
521
522    #[test]
523    #[allow(clippy::float_cmp)] // Comparing to exact 0.0 literal is safe
524    fn test_hit_rate_zero() {
525        let metrics = CacheMetrics::new();
526        assert_eq!(metrics.hit_rate(), 0.0);
527    }
528
529    #[test]
530    fn test_bytes_tracking() {
531        let metrics = CacheMetrics::new();
532        metrics.record_download(1000);
533        metrics.record_upload(500);
534        assert_eq!(metrics.bytes_downloaded(), 1000);
535        assert_eq!(metrics.bytes_uploaded(), 500);
536    }
537
538    #[test]
539    fn test_prometheus_format() {
540        let metrics = CacheMetrics::new();
541        metrics.record_hit(CachePolicy::Normal, "t1");
542        metrics.record_miss(CachePolicy::Normal, "t2");
543
544        let output = metrics.to_prometheus();
545        assert!(output.contains("cuenv_cache_hit_total"));
546        assert!(output.contains("cuenv_cache_miss_total"));
547        assert!(output.contains("policy=\"normal\""));
548    }
549
550    #[test]
551    fn test_avg_latency() {
552        let metrics = CacheMetrics::new();
553        metrics.record_check_latency(100);
554        metrics.record_check_latency(200);
555        metrics.record_check_latency(300);
556        assert_eq!(metrics.avg_check_latency_us(), 200);
557    }
558}