1use crate::ir::CachePolicy;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10
11#[derive(Debug, Default)]
13pub struct CacheMetrics {
14 hits: PolicyCounters,
16 misses: PolicyCounters,
18 restore_failures: ErrorCounters,
20 bytes_downloaded: AtomicU64,
22 bytes_uploaded: AtomicU64,
24 check_latency_us: AtomicU64,
26 check_count: AtomicU64,
28 task_durations: TaskDurations,
30 runtime_durations: RuntimeDurations,
32}
33
34#[derive(Debug, Default)]
36struct TaskDurations {
37 total_us: AtomicU64,
39 count: AtomicU64,
41 per_task: RwLock<HashMap<String, u64>>,
43}
44
45#[derive(Debug, Default)]
47struct RuntimeDurations {
48 total_us: AtomicU64,
50 count: AtomicU64,
52 per_runtime: RwLock<HashMap<String, u64>>,
54}
55
56#[derive(Debug, Default)]
58struct PolicyCounters {
59 normal: AtomicU64,
60 readonly: AtomicU64,
61 writeonly: AtomicU64,
62 disabled: AtomicU64,
63}
64
65#[derive(Debug, Default)]
67struct ErrorCounters {
68 connection: AtomicU64,
69 timeout: AtomicU64,
70 not_found: AtomicU64,
71 digest_mismatch: AtomicU64,
72 other: AtomicU64,
73}
74
75impl CacheMetrics {
76 #[must_use]
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn record_hit(&self, policy: CachePolicy, task_id: &str) {
84 self.hits.increment(policy);
85 tracing::debug!(
86 task = %task_id,
87 policy = ?policy,
88 metric = "cuenv_cache_hit_total",
89 "Cache hit recorded"
90 );
91 }
92
93 pub fn record_miss(&self, policy: CachePolicy, task_id: &str) {
95 self.misses.increment(policy);
96 tracing::debug!(
97 task = %task_id,
98 policy = ?policy,
99 metric = "cuenv_cache_miss_total",
100 "Cache miss recorded"
101 );
102 }
103
104 pub fn record_restore_failure(&self, error_type: RestoreErrorType, task_id: &str) {
106 self.restore_failures.increment(error_type);
107 tracing::debug!(
108 task = %task_id,
109 error_type = ?error_type,
110 metric = "cuenv_cache_restore_failure_total",
111 "Cache restore failure recorded"
112 );
113 }
114
115 pub fn record_download(&self, bytes: u64) {
117 self.bytes_downloaded.fetch_add(bytes, Ordering::Relaxed);
118 }
119
120 pub fn record_upload(&self, bytes: u64) {
122 self.bytes_uploaded.fetch_add(bytes, Ordering::Relaxed);
123 }
124
125 pub fn record_check_latency(&self, latency_us: u64) {
127 self.check_latency_us
128 .fetch_add(latency_us, Ordering::Relaxed);
129 self.check_count.fetch_add(1, Ordering::Relaxed);
130 }
131
132 pub fn record_task_duration(&self, task_id: &str, millis: u64) {
134 let micros = millis * 1000;
135 self.task_durations
136 .total_us
137 .fetch_add(micros, Ordering::Relaxed);
138 self.task_durations.count.fetch_add(1, Ordering::Relaxed);
139
140 if let Ok(mut map) = self.task_durations.per_task.write() {
141 map.insert(task_id.to_string(), micros);
142 }
143
144 tracing::debug!(
145 task = %task_id,
146 duration_ms = millis,
147 metric = "cuenv_task_duration_seconds",
148 "Task duration recorded"
149 );
150 }
151
152 pub fn record_runtime_materialization(&self, runtime_id: &str, millis: u64) {
154 let micros = millis * 1000;
155 self.runtime_durations
156 .total_us
157 .fetch_add(micros, Ordering::Relaxed);
158 self.runtime_durations.count.fetch_add(1, Ordering::Relaxed);
159
160 if let Ok(mut map) = self.runtime_durations.per_runtime.write() {
161 map.insert(runtime_id.to_string(), micros);
162 }
163
164 tracing::debug!(
165 runtime = %runtime_id,
166 duration_ms = millis,
167 metric = "cuenv_runtime_materialization_seconds",
168 "Runtime materialization recorded"
169 );
170 }
171
172 #[must_use]
174 pub fn total_task_time_ms(&self) -> u64 {
175 self.task_durations.total_us.load(Ordering::Relaxed) / 1000
176 }
177
178 #[must_use]
180 pub fn task_count(&self) -> u64 {
181 self.task_durations.count.load(Ordering::Relaxed)
182 }
183
184 #[must_use]
186 pub fn avg_task_duration_ms(&self) -> u64 {
187 let count = self.task_durations.count.load(Ordering::Relaxed);
188 if count == 0 {
189 return 0;
190 }
191 self.task_durations.total_us.load(Ordering::Relaxed) / count / 1000
192 }
193
194 #[must_use]
196 pub fn total_runtime_time_ms(&self) -> u64 {
197 self.runtime_durations.total_us.load(Ordering::Relaxed) / 1000
198 }
199
200 #[must_use]
202 pub fn runtime_count(&self) -> u64 {
203 self.runtime_durations.count.load(Ordering::Relaxed)
204 }
205
206 #[must_use]
208 pub fn hits(&self, policy: CachePolicy) -> u64 {
209 self.hits.get(policy)
210 }
211
212 #[must_use]
214 pub fn misses(&self, policy: CachePolicy) -> u64 {
215 self.misses.get(policy)
216 }
217
218 #[must_use]
220 pub fn restore_failures(&self, error_type: RestoreErrorType) -> u64 {
221 self.restore_failures.get(error_type)
222 }
223
224 #[must_use]
226 pub fn bytes_downloaded(&self) -> u64 {
227 self.bytes_downloaded.load(Ordering::Relaxed)
228 }
229
230 #[must_use]
232 pub fn bytes_uploaded(&self) -> u64 {
233 self.bytes_uploaded.load(Ordering::Relaxed)
234 }
235
236 #[must_use]
238 pub fn avg_check_latency_us(&self) -> u64 {
239 let count = self.check_count.load(Ordering::Relaxed);
240 if count == 0 {
241 return 0;
242 }
243 self.check_latency_us.load(Ordering::Relaxed) / count
244 }
245
246 #[must_use]
248 #[allow(clippy::cast_precision_loss)] pub fn hit_rate(&self) -> f64 {
250 let total_hits: u64 = [
251 CachePolicy::Normal,
252 CachePolicy::Readonly,
253 CachePolicy::Writeonly,
254 CachePolicy::Disabled,
255 ]
256 .iter()
257 .map(|p| self.hits(*p))
258 .sum();
259
260 let total_misses: u64 = [
261 CachePolicy::Normal,
262 CachePolicy::Readonly,
263 CachePolicy::Writeonly,
264 CachePolicy::Disabled,
265 ]
266 .iter()
267 .map(|p| self.misses(*p))
268 .sum();
269
270 let total = total_hits + total_misses;
271 if total == 0 {
272 return 0.0;
273 }
274 total_hits as f64 / total as f64
275 }
276
277 #[must_use]
279 #[allow(clippy::cast_precision_loss)] #[allow(clippy::too_many_lines)] pub fn to_prometheus(&self) -> String {
282 use std::fmt::Write;
283
284 let mut output = String::new();
285
286 output.push_str("# HELP cuenv_cache_hit_total Total number of cache hits\n");
288 output.push_str("# TYPE cuenv_cache_hit_total counter\n");
289 for policy in &["normal", "readonly", "writeonly", "disabled"] {
290 let count = match *policy {
291 "normal" => self.hits.normal.load(Ordering::Relaxed),
292 "readonly" => self.hits.readonly.load(Ordering::Relaxed),
293 "writeonly" => self.hits.writeonly.load(Ordering::Relaxed),
294 "disabled" => self.hits.disabled.load(Ordering::Relaxed),
295 _ => 0,
296 };
297 let _ = writeln!(
298 output,
299 "cuenv_cache_hit_total{{policy=\"{policy}\"}} {count}"
300 );
301 }
302
303 output.push_str("# HELP cuenv_cache_miss_total Total number of cache misses\n");
305 output.push_str("# TYPE cuenv_cache_miss_total counter\n");
306 for policy in &["normal", "readonly", "writeonly", "disabled"] {
307 let count = match *policy {
308 "normal" => self.misses.normal.load(Ordering::Relaxed),
309 "readonly" => self.misses.readonly.load(Ordering::Relaxed),
310 "writeonly" => self.misses.writeonly.load(Ordering::Relaxed),
311 "disabled" => self.misses.disabled.load(Ordering::Relaxed),
312 _ => 0,
313 };
314 let _ = writeln!(
315 output,
316 "cuenv_cache_miss_total{{policy=\"{policy}\"}} {count}"
317 );
318 }
319
320 output.push_str(
322 "# HELP cuenv_cache_restore_failure_total Total number of cache restore failures\n",
323 );
324 output.push_str("# TYPE cuenv_cache_restore_failure_total counter\n");
325 for error_type in &[
326 "connection",
327 "timeout",
328 "not_found",
329 "digest_mismatch",
330 "other",
331 ] {
332 let count = match *error_type {
333 "connection" => self.restore_failures.connection.load(Ordering::Relaxed),
334 "timeout" => self.restore_failures.timeout.load(Ordering::Relaxed),
335 "not_found" => self.restore_failures.not_found.load(Ordering::Relaxed),
336 "digest_mismatch" => self
337 .restore_failures
338 .digest_mismatch
339 .load(Ordering::Relaxed),
340 "other" => self.restore_failures.other.load(Ordering::Relaxed),
341 _ => 0,
342 };
343 let _ = writeln!(
344 output,
345 "cuenv_cache_restore_failure_total{{error_type=\"{error_type}\"}} {count}"
346 );
347 }
348
349 output.push_str(
351 "# HELP cuenv_cache_bytes_downloaded_total Total bytes downloaded from cache\n",
352 );
353 output.push_str("# TYPE cuenv_cache_bytes_downloaded_total counter\n");
354 let _ = writeln!(
355 output,
356 "cuenv_cache_bytes_downloaded_total {}",
357 self.bytes_downloaded.load(Ordering::Relaxed)
358 );
359
360 output.push_str("# HELP cuenv_cache_bytes_uploaded_total Total bytes uploaded to cache\n");
361 output.push_str("# TYPE cuenv_cache_bytes_uploaded_total counter\n");
362 let _ = writeln!(
363 output,
364 "cuenv_cache_bytes_uploaded_total {}",
365 self.bytes_uploaded.load(Ordering::Relaxed)
366 );
367
368 output.push_str(
370 "# HELP cuenv_task_duration_seconds_total Total task execution time in seconds\n",
371 );
372 output.push_str("# TYPE cuenv_task_duration_seconds_total counter\n");
373 let task_total_secs =
374 self.task_durations.total_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
375 let _ = writeln!(
376 output,
377 "cuenv_task_duration_seconds_total {task_total_secs:.3}"
378 );
379
380 output.push_str("# HELP cuenv_tasks_executed_total Total number of tasks executed\n");
381 output.push_str("# TYPE cuenv_tasks_executed_total counter\n");
382 let _ = writeln!(
383 output,
384 "cuenv_tasks_executed_total {}",
385 self.task_durations.count.load(Ordering::Relaxed)
386 );
387
388 output.push_str("# HELP cuenv_runtime_materialization_seconds_total Total runtime materialization time in seconds\n");
390 output.push_str("# TYPE cuenv_runtime_materialization_seconds_total counter\n");
391 let runtime_total_secs =
392 self.runtime_durations.total_us.load(Ordering::Relaxed) as f64 / 1_000_000.0;
393 let _ = writeln!(
394 output,
395 "cuenv_runtime_materialization_seconds_total {runtime_total_secs:.3}"
396 );
397
398 output.push_str(
399 "# HELP cuenv_runtimes_materialized_total Total number of runtimes materialized\n",
400 );
401 output.push_str("# TYPE cuenv_runtimes_materialized_total counter\n");
402 let _ = writeln!(
403 output,
404 "cuenv_runtimes_materialized_total {}",
405 self.runtime_durations.count.load(Ordering::Relaxed)
406 );
407
408 output
409 }
410}
411
412impl PolicyCounters {
413 fn increment(&self, policy: CachePolicy) {
414 match policy {
415 CachePolicy::Normal => self.normal.fetch_add(1, Ordering::Relaxed),
416 CachePolicy::Readonly => self.readonly.fetch_add(1, Ordering::Relaxed),
417 CachePolicy::Writeonly => self.writeonly.fetch_add(1, Ordering::Relaxed),
418 CachePolicy::Disabled => self.disabled.fetch_add(1, Ordering::Relaxed),
419 };
420 }
421
422 fn get(&self, policy: CachePolicy) -> u64 {
423 match policy {
424 CachePolicy::Normal => self.normal.load(Ordering::Relaxed),
425 CachePolicy::Readonly => self.readonly.load(Ordering::Relaxed),
426 CachePolicy::Writeonly => self.writeonly.load(Ordering::Relaxed),
427 CachePolicy::Disabled => self.disabled.load(Ordering::Relaxed),
428 }
429 }
430}
431
432#[derive(Debug, Clone, Copy, PartialEq, Eq)]
434pub enum RestoreErrorType {
435 Connection,
437 Timeout,
439 NotFound,
441 DigestMismatch,
443 Other,
445}
446
447impl ErrorCounters {
448 fn increment(&self, error_type: RestoreErrorType) {
449 match error_type {
450 RestoreErrorType::Connection => self.connection.fetch_add(1, Ordering::Relaxed),
451 RestoreErrorType::Timeout => self.timeout.fetch_add(1, Ordering::Relaxed),
452 RestoreErrorType::NotFound => self.not_found.fetch_add(1, Ordering::Relaxed),
453 RestoreErrorType::DigestMismatch => {
454 self.digest_mismatch.fetch_add(1, Ordering::Relaxed)
455 }
456 RestoreErrorType::Other => self.other.fetch_add(1, Ordering::Relaxed),
457 };
458 }
459
460 fn get(&self, error_type: RestoreErrorType) -> u64 {
461 match error_type {
462 RestoreErrorType::Connection => self.connection.load(Ordering::Relaxed),
463 RestoreErrorType::Timeout => self.timeout.load(Ordering::Relaxed),
464 RestoreErrorType::NotFound => self.not_found.load(Ordering::Relaxed),
465 RestoreErrorType::DigestMismatch => self.digest_mismatch.load(Ordering::Relaxed),
466 RestoreErrorType::Other => self.other.load(Ordering::Relaxed),
467 }
468 }
469}
470
471static GLOBAL_METRICS: std::sync::OnceLock<Arc<CacheMetrics>> = std::sync::OnceLock::new();
473
474#[must_use]
476pub fn global_metrics() -> Arc<CacheMetrics> {
477 GLOBAL_METRICS
478 .get_or_init(|| Arc::new(CacheMetrics::new()))
479 .clone()
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn test_record_hit() {
488 let metrics = CacheMetrics::new();
489 metrics.record_hit(CachePolicy::Normal, "test-task");
490 assert_eq!(metrics.hits(CachePolicy::Normal), 1);
491 assert_eq!(metrics.hits(CachePolicy::Readonly), 0);
492 }
493
494 #[test]
495 fn test_record_miss() {
496 let metrics = CacheMetrics::new();
497 metrics.record_miss(CachePolicy::Readonly, "test-task");
498 assert_eq!(metrics.misses(CachePolicy::Readonly), 1);
499 assert_eq!(metrics.misses(CachePolicy::Normal), 0);
500 }
501
502 #[test]
503 fn test_record_restore_failure() {
504 let metrics = CacheMetrics::new();
505 metrics.record_restore_failure(RestoreErrorType::Connection, "test-task");
506 assert_eq!(metrics.restore_failures(RestoreErrorType::Connection), 1);
507 assert_eq!(metrics.restore_failures(RestoreErrorType::Timeout), 0);
508 }
509
510 #[test]
511 fn test_hit_rate() {
512 let metrics = CacheMetrics::new();
513 metrics.record_hit(CachePolicy::Normal, "t1");
514 metrics.record_hit(CachePolicy::Normal, "t2");
515 metrics.record_hit(CachePolicy::Normal, "t3");
516 metrics.record_miss(CachePolicy::Normal, "t4");
517
518 let rate = metrics.hit_rate();
519 assert!((rate - 0.75).abs() < 0.001);
520 }
521
522 #[test]
523 #[allow(clippy::float_cmp)] fn test_hit_rate_zero() {
525 let metrics = CacheMetrics::new();
526 assert_eq!(metrics.hit_rate(), 0.0);
527 }
528
529 #[test]
530 fn test_bytes_tracking() {
531 let metrics = CacheMetrics::new();
532 metrics.record_download(1000);
533 metrics.record_upload(500);
534 assert_eq!(metrics.bytes_downloaded(), 1000);
535 assert_eq!(metrics.bytes_uploaded(), 500);
536 }
537
538 #[test]
539 fn test_prometheus_format() {
540 let metrics = CacheMetrics::new();
541 metrics.record_hit(CachePolicy::Normal, "t1");
542 metrics.record_miss(CachePolicy::Normal, "t2");
543
544 let output = metrics.to_prometheus();
545 assert!(output.contains("cuenv_cache_hit_total"));
546 assert!(output.contains("cuenv_cache_miss_total"));
547 assert!(output.contains("policy=\"normal\""));
548 }
549
550 #[test]
551 fn test_avg_latency() {
552 let metrics = CacheMetrics::new();
553 metrics.record_check_latency(100);
554 metrics.record_check_latency(200);
555 metrics.record_check_latency(300);
556 assert_eq!(metrics.avg_check_latency_us(), 200);
557 }
558}