1use std::collections::HashMap;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::RwLock;
40use std::time::{Duration, Instant};
41
42#[derive(Debug, Clone)]
44pub struct MetricsConfig {
45 pub namespace: String,
47 pub endpoint_label: bool,
49 pub method_label: bool,
51 pub histogram_buckets: Vec<f64>,
53}
54
55impl Default for MetricsConfig {
56 fn default() -> Self {
57 Self {
58 namespace: "talos_client".to_string(),
59 endpoint_label: true,
60 method_label: true,
61 histogram_buckets: vec![
62 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
63 ],
64 }
65 }
66}
67
68impl MetricsConfig {
69 pub fn builder() -> MetricsConfigBuilder {
71 MetricsConfigBuilder::default()
72 }
73}
74
75#[derive(Debug, Default)]
77pub struct MetricsConfigBuilder {
78 namespace: Option<String>,
79 endpoint_label: Option<bool>,
80 method_label: Option<bool>,
81 histogram_buckets: Option<Vec<f64>>,
82}
83
84impl MetricsConfigBuilder {
85 pub fn namespace(mut self, namespace: impl Into<String>) -> Self {
87 self.namespace = Some(namespace.into());
88 self
89 }
90
91 pub fn endpoint_label(mut self, enabled: bool) -> Self {
93 self.endpoint_label = Some(enabled);
94 self
95 }
96
97 pub fn method_label(mut self, enabled: bool) -> Self {
99 self.method_label = Some(enabled);
100 self
101 }
102
103 pub fn histogram_buckets(mut self, buckets: Vec<f64>) -> Self {
105 self.histogram_buckets = Some(buckets);
106 self
107 }
108
109 pub fn build(self) -> MetricsConfig {
111 let default = MetricsConfig::default();
112 MetricsConfig {
113 namespace: self.namespace.unwrap_or(default.namespace),
114 endpoint_label: self.endpoint_label.unwrap_or(default.endpoint_label),
115 method_label: self.method_label.unwrap_or(default.method_label),
116 histogram_buckets: self.histogram_buckets.unwrap_or(default.histogram_buckets),
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Hash)]
123struct Labels {
124 method: Option<String>,
125 endpoint: Option<String>,
126 status: String,
127}
128
129#[derive(Debug)]
131struct Histogram {
132 buckets: Vec<f64>,
133 counts: Vec<AtomicU64>,
134 sum: AtomicU64, count: AtomicU64,
136}
137
138impl Histogram {
139 fn new(buckets: Vec<f64>) -> Self {
140 let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
141 Self {
142 buckets,
143 counts,
144 sum: AtomicU64::new(0),
145 count: AtomicU64::new(0),
146 }
147 }
148
149 fn observe(&self, value_secs: f64) {
150 for (i, bucket) in self.buckets.iter().enumerate() {
152 if value_secs <= *bucket {
153 for j in i..self.buckets.len() {
154 self.counts[j].fetch_add(1, Ordering::Relaxed);
155 }
156 break;
157 }
158 }
159
160 self.sum
162 .fetch_add((value_secs * 1_000_000_000.0) as u64, Ordering::Relaxed);
163 self.count.fetch_add(1, Ordering::Relaxed);
164 }
165
166 fn sum_secs(&self) -> f64 {
167 self.sum.load(Ordering::Relaxed) as f64 / 1_000_000_000.0
168 }
169
170 fn total_count(&self) -> u64 {
171 self.count.load(Ordering::Relaxed)
172 }
173}
174
175type HistogramKey = (Option<String>, Option<String>);
177
178#[derive(Debug)]
180pub struct MetricsCollector {
181 config: MetricsConfig,
182 requests_total: RwLock<HashMap<Labels, AtomicU64>>,
184 request_duration: RwLock<HashMap<HistogramKey, Histogram>>,
186 circuit_breaker_state: AtomicU64,
188 circuit_breaker_rejections: AtomicU64,
190 pool_healthy_endpoints: AtomicU64,
192 pool_total_endpoints: AtomicU64,
194 pool_failovers: AtomicU64,
196 start_time: Instant,
198}
199
200impl MetricsCollector {
201 pub fn new(config: MetricsConfig) -> Self {
203 Self {
204 config,
205 requests_total: RwLock::new(HashMap::new()),
206 request_duration: RwLock::new(HashMap::new()),
207 circuit_breaker_state: AtomicU64::new(0),
208 circuit_breaker_rejections: AtomicU64::new(0),
209 pool_healthy_endpoints: AtomicU64::new(0),
210 pool_total_endpoints: AtomicU64::new(0),
211 pool_failovers: AtomicU64::new(0),
212 start_time: Instant::now(),
213 }
214 }
215
216 pub fn with_defaults() -> Self {
218 Self::new(MetricsConfig::default())
219 }
220
221 pub fn record_request(&self, method: &str, endpoint: &str, success: bool, duration: Duration) {
223 let labels = Labels {
224 method: if self.config.method_label {
225 Some(method.to_string())
226 } else {
227 None
228 },
229 endpoint: if self.config.endpoint_label {
230 Some(endpoint.to_string())
231 } else {
232 None
233 },
234 status: if success { "success" } else { "error" }.to_string(),
235 };
236
237 {
239 let counters = self.requests_total.read().expect("lock poisoned");
240 if let Some(counter) = counters.get(&labels) {
241 counter.fetch_add(1, Ordering::Relaxed);
242 } else {
243 drop(counters);
244 let mut counters = self.requests_total.write().expect("lock poisoned");
245 counters
246 .entry(labels)
247 .or_insert_with(|| AtomicU64::new(0))
248 .fetch_add(1, Ordering::Relaxed);
249 }
250 }
251
252 let hist_key = (
254 if self.config.method_label {
255 Some(method.to_string())
256 } else {
257 None
258 },
259 if self.config.endpoint_label {
260 Some(endpoint.to_string())
261 } else {
262 None
263 },
264 );
265
266 {
267 let histograms = self.request_duration.read().expect("lock poisoned");
268 if let Some(hist) = histograms.get(&hist_key) {
269 hist.observe(duration.as_secs_f64());
270 } else {
271 drop(histograms);
272 let mut histograms = self.request_duration.write().expect("lock poisoned");
273 let hist = histograms
274 .entry(hist_key)
275 .or_insert_with(|| Histogram::new(self.config.histogram_buckets.clone()));
276 hist.observe(duration.as_secs_f64());
277 }
278 }
279 }
280
281 pub fn set_circuit_breaker_state(&self, state: u64) {
283 self.circuit_breaker_state.store(state, Ordering::Relaxed);
284 }
285
286 pub fn record_circuit_breaker_rejection(&self) {
288 self.circuit_breaker_rejections
289 .fetch_add(1, Ordering::Relaxed);
290 }
291
292 pub fn set_pool_endpoints(&self, healthy: u64, total: u64) {
294 self.pool_healthy_endpoints
295 .store(healthy, Ordering::Relaxed);
296 self.pool_total_endpoints.store(total, Ordering::Relaxed);
297 }
298
299 pub fn record_pool_failover(&self) {
301 self.pool_failovers.fetch_add(1, Ordering::Relaxed);
302 }
303
304 pub fn total_requests(&self) -> u64 {
306 let counters = self.requests_total.read().expect("lock poisoned");
307 counters.values().map(|c| c.load(Ordering::Relaxed)).sum()
308 }
309
310 pub fn successful_requests(&self) -> u64 {
312 let counters = self.requests_total.read().expect("lock poisoned");
313 counters
314 .iter()
315 .filter(|(labels, _)| labels.status == "success")
316 .map(|(_, c)| c.load(Ordering::Relaxed))
317 .sum()
318 }
319
320 pub fn failed_requests(&self) -> u64 {
322 let counters = self.requests_total.read().expect("lock poisoned");
323 counters
324 .iter()
325 .filter(|(labels, _)| labels.status == "error")
326 .map(|(_, c)| c.load(Ordering::Relaxed))
327 .sum()
328 }
329
330 pub fn uptime(&self) -> Duration {
332 self.start_time.elapsed()
333 }
334
335 pub fn to_prometheus_text(&self) -> String {
337 let mut output = String::new();
338 let ns = &self.config.namespace;
339
340 output.push_str(&format!(
342 "# HELP {ns}_requests_total Total number of requests\n"
343 ));
344 output.push_str(&format!("# TYPE {ns}_requests_total counter\n"));
345 {
346 let counters = self.requests_total.read().expect("lock poisoned");
347 for (labels, count) in counters.iter() {
348 let mut label_parts = vec![format!("status=\"{}\"", labels.status)];
349 if let Some(ref method) = labels.method {
350 label_parts.insert(0, format!("method=\"{method}\""));
351 }
352 if let Some(ref endpoint) = labels.endpoint {
353 label_parts.insert(1, format!("endpoint=\"{endpoint}\""));
354 }
355 let label_str = label_parts.join(",");
356 output.push_str(&format!(
357 "{ns}_requests_total{{{label_str}}} {}\n",
358 count.load(Ordering::Relaxed)
359 ));
360 }
361 }
362 output.push('\n');
363
364 output.push_str(&format!(
366 "# HELP {ns}_request_duration_seconds Request duration in seconds\n"
367 ));
368 output.push_str(&format!("# TYPE {ns}_request_duration_seconds histogram\n"));
369 {
370 let histograms = self.request_duration.read().expect("lock poisoned");
371 for ((method, endpoint), hist) in histograms.iter() {
372 let base_labels = match (method, endpoint) {
373 (Some(m), Some(e)) => format!("method=\"{m}\",endpoint=\"{e}\""),
374 (Some(m), None) => format!("method=\"{m}\""),
375 (None, Some(e)) => format!("endpoint=\"{e}\""),
376 (None, None) => String::new(),
377 };
378
379 for (i, bucket) in hist.buckets.iter().enumerate() {
381 let count = hist.counts[i].load(Ordering::Relaxed);
382 let le = if *bucket == f64::INFINITY {
383 "+Inf".to_string()
384 } else {
385 format!("{bucket}")
386 };
387 if base_labels.is_empty() {
388 output.push_str(&format!(
389 "{ns}_request_duration_seconds_bucket{{le=\"{le}\"}} {count}\n"
390 ));
391 } else {
392 output.push_str(&format!(
393 "{ns}_request_duration_seconds_bucket{{{base_labels},le=\"{le}\"}} {count}\n"
394 ));
395 }
396 }
397
398 let inf_count = hist.total_count();
400 if base_labels.is_empty() {
401 output.push_str(&format!(
402 "{ns}_request_duration_seconds_bucket{{le=\"+Inf\"}} {inf_count}\n"
403 ));
404 } else {
405 output.push_str(&format!(
406 "{ns}_request_duration_seconds_bucket{{{base_labels},le=\"+Inf\"}} {inf_count}\n"
407 ));
408 }
409
410 if base_labels.is_empty() {
412 output.push_str(&format!(
413 "{ns}_request_duration_seconds_sum {}\n",
414 hist.sum_secs()
415 ));
416 output.push_str(&format!(
417 "{ns}_request_duration_seconds_count {inf_count}\n"
418 ));
419 } else {
420 output.push_str(&format!(
421 "{ns}_request_duration_seconds_sum{{{base_labels}}} {}\n",
422 hist.sum_secs()
423 ));
424 output.push_str(&format!(
425 "{ns}_request_duration_seconds_count{{{base_labels}}} {inf_count}\n"
426 ));
427 }
428 }
429 }
430 output.push('\n');
431
432 output.push_str(&format!(
434 "# HELP {ns}_circuit_breaker_state Circuit breaker state (0=closed, 1=half-open, 2=open)\n"
435 ));
436 output.push_str(&format!("# TYPE {ns}_circuit_breaker_state gauge\n"));
437 output.push_str(&format!(
438 "{ns}_circuit_breaker_state {}\n\n",
439 self.circuit_breaker_state.load(Ordering::Relaxed)
440 ));
441
442 output.push_str(&format!(
443 "# HELP {ns}_circuit_breaker_rejections_total Requests rejected by circuit breaker\n"
444 ));
445 output.push_str(&format!(
446 "# TYPE {ns}_circuit_breaker_rejections_total counter\n"
447 ));
448 output.push_str(&format!(
449 "{ns}_circuit_breaker_rejections_total {}\n\n",
450 self.circuit_breaker_rejections.load(Ordering::Relaxed)
451 ));
452
453 output.push_str(&format!(
455 "# HELP {ns}_pool_healthy_endpoints Number of healthy endpoints in pool\n"
456 ));
457 output.push_str(&format!("# TYPE {ns}_pool_healthy_endpoints gauge\n"));
458 output.push_str(&format!(
459 "{ns}_pool_healthy_endpoints {}\n\n",
460 self.pool_healthy_endpoints.load(Ordering::Relaxed)
461 ));
462
463 output.push_str(&format!(
464 "# HELP {ns}_pool_total_endpoints Total endpoints in pool\n"
465 ));
466 output.push_str(&format!("# TYPE {ns}_pool_total_endpoints gauge\n"));
467 output.push_str(&format!(
468 "{ns}_pool_total_endpoints {}\n\n",
469 self.pool_total_endpoints.load(Ordering::Relaxed)
470 ));
471
472 output.push_str(&format!(
473 "# HELP {ns}_pool_failovers_total Connection pool failover events\n"
474 ));
475 output.push_str(&format!("# TYPE {ns}_pool_failovers_total counter\n"));
476 output.push_str(&format!(
477 "{ns}_pool_failovers_total {}\n\n",
478 self.pool_failovers.load(Ordering::Relaxed)
479 ));
480
481 output.push_str(&format!(
483 "# HELP {ns}_uptime_seconds Client uptime in seconds\n"
484 ));
485 output.push_str(&format!("# TYPE {ns}_uptime_seconds gauge\n"));
486 output.push_str(&format!(
487 "{ns}_uptime_seconds {}\n",
488 self.uptime().as_secs_f64()
489 ));
490
491 output
492 }
493}
494
495impl Default for MetricsCollector {
496 fn default() -> Self {
497 Self::with_defaults()
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct MetricsSnapshot {
504 pub total_requests: u64,
506 pub successful_requests: u64,
508 pub failed_requests: u64,
510 pub circuit_breaker_state: u64,
512 pub circuit_breaker_rejections: u64,
514 pub pool_healthy_endpoints: u64,
516 pub pool_total_endpoints: u64,
518 pub pool_failovers: u64,
520 pub uptime: Duration,
522}
523
524impl MetricsCollector {
525 pub fn snapshot(&self) -> MetricsSnapshot {
527 MetricsSnapshot {
528 total_requests: self.total_requests(),
529 successful_requests: self.successful_requests(),
530 failed_requests: self.failed_requests(),
531 circuit_breaker_state: self.circuit_breaker_state.load(Ordering::Relaxed),
532 circuit_breaker_rejections: self.circuit_breaker_rejections.load(Ordering::Relaxed),
533 pool_healthy_endpoints: self.pool_healthy_endpoints.load(Ordering::Relaxed),
534 pool_total_endpoints: self.pool_total_endpoints.load(Ordering::Relaxed),
535 pool_failovers: self.pool_failovers.load(Ordering::Relaxed),
536 uptime: self.uptime(),
537 }
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544
545 #[test]
546 fn test_metrics_config_default() {
547 let config = MetricsConfig::default();
548 assert_eq!(config.namespace, "talos_client");
549 assert!(config.endpoint_label);
550 assert!(config.method_label);
551 assert!(!config.histogram_buckets.is_empty());
552 }
553
554 #[test]
555 fn test_metrics_config_builder() {
556 let config = MetricsConfig::builder()
557 .namespace("my_talos")
558 .endpoint_label(false)
559 .method_label(true)
560 .histogram_buckets(vec![0.1, 0.5, 1.0])
561 .build();
562
563 assert_eq!(config.namespace, "my_talos");
564 assert!(!config.endpoint_label);
565 assert!(config.method_label);
566 assert_eq!(config.histogram_buckets, vec![0.1, 0.5, 1.0]);
567 }
568
569 #[test]
570 fn test_record_request_success() {
571 let metrics = MetricsCollector::with_defaults();
572 metrics.record_request("Version", "10.0.0.1:50000", true, Duration::from_millis(42));
573
574 assert_eq!(metrics.total_requests(), 1);
575 assert_eq!(metrics.successful_requests(), 1);
576 assert_eq!(metrics.failed_requests(), 0);
577 }
578
579 #[test]
580 fn test_record_request_failure() {
581 let metrics = MetricsCollector::with_defaults();
582 metrics.record_request(
583 "Version",
584 "10.0.0.1:50000",
585 false,
586 Duration::from_millis(100),
587 );
588
589 assert_eq!(metrics.total_requests(), 1);
590 assert_eq!(metrics.successful_requests(), 0);
591 assert_eq!(metrics.failed_requests(), 1);
592 }
593
594 #[test]
595 fn test_multiple_requests() {
596 let metrics = MetricsCollector::with_defaults();
597 metrics.record_request("Version", "10.0.0.1:50000", true, Duration::from_millis(10));
598 metrics.record_request(
599 "Hostname",
600 "10.0.0.1:50000",
601 true,
602 Duration::from_millis(20),
603 );
604 metrics.record_request(
605 "Version",
606 "10.0.0.2:50000",
607 false,
608 Duration::from_millis(30),
609 );
610
611 assert_eq!(metrics.total_requests(), 3);
612 assert_eq!(metrics.successful_requests(), 2);
613 assert_eq!(metrics.failed_requests(), 1);
614 }
615
616 #[test]
617 fn test_circuit_breaker_metrics() {
618 let metrics = MetricsCollector::with_defaults();
619
620 metrics.set_circuit_breaker_state(0);
621 assert_eq!(metrics.circuit_breaker_state.load(Ordering::Relaxed), 0);
622
623 metrics.set_circuit_breaker_state(2);
624 assert_eq!(metrics.circuit_breaker_state.load(Ordering::Relaxed), 2);
625
626 metrics.record_circuit_breaker_rejection();
627 metrics.record_circuit_breaker_rejection();
628 assert_eq!(
629 metrics.circuit_breaker_rejections.load(Ordering::Relaxed),
630 2
631 );
632 }
633
634 #[test]
635 fn test_pool_metrics() {
636 let metrics = MetricsCollector::with_defaults();
637
638 metrics.set_pool_endpoints(3, 5);
639 assert_eq!(metrics.pool_healthy_endpoints.load(Ordering::Relaxed), 3);
640 assert_eq!(metrics.pool_total_endpoints.load(Ordering::Relaxed), 5);
641
642 metrics.record_pool_failover();
643 assert_eq!(metrics.pool_failovers.load(Ordering::Relaxed), 1);
644 }
645
646 #[test]
647 fn test_snapshot() {
648 let metrics = MetricsCollector::with_defaults();
649 metrics.record_request("Version", "10.0.0.1:50000", true, Duration::from_millis(10));
650 metrics.set_circuit_breaker_state(1);
651 metrics.set_pool_endpoints(2, 3);
652
653 let snapshot = metrics.snapshot();
654 assert_eq!(snapshot.total_requests, 1);
655 assert_eq!(snapshot.successful_requests, 1);
656 assert_eq!(snapshot.circuit_breaker_state, 1);
657 assert_eq!(snapshot.pool_healthy_endpoints, 2);
658 assert_eq!(snapshot.pool_total_endpoints, 3);
659 }
660
661 #[test]
662 fn test_prometheus_text_format() {
663 let metrics = MetricsCollector::new(MetricsConfig {
664 namespace: "test".to_string(),
665 endpoint_label: false,
666 method_label: true,
667 histogram_buckets: vec![0.1, 1.0],
668 });
669
670 metrics.record_request("Version", "10.0.0.1:50000", true, Duration::from_millis(50));
671
672 let output = metrics.to_prometheus_text();
673
674 assert!(output.contains("# HELP test_requests_total"));
675 assert!(output.contains("# TYPE test_requests_total counter"));
676 assert!(output.contains("test_requests_total{method=\"Version\",status=\"success\"}"));
677 assert!(output.contains("# HELP test_request_duration_seconds"));
678 assert!(output.contains("test_request_duration_seconds_bucket"));
679 assert!(output.contains("test_circuit_breaker_state"));
680 assert!(output.contains("test_pool_healthy_endpoints"));
681 assert!(output.contains("test_uptime_seconds"));
682 }
683
684 #[test]
685 fn test_histogram_buckets() {
686 let hist = Histogram::new(vec![0.01, 0.1, 1.0]);
687
688 hist.observe(0.05);
690
691 assert_eq!(hist.counts[0].load(Ordering::Relaxed), 0); assert_eq!(hist.counts[1].load(Ordering::Relaxed), 1); assert_eq!(hist.counts[2].load(Ordering::Relaxed), 1); assert_eq!(hist.total_count(), 1);
695 }
696
697 #[test]
698 fn test_metrics_without_labels() {
699 let config = MetricsConfig::builder()
700 .endpoint_label(false)
701 .method_label(false)
702 .build();
703
704 let metrics = MetricsCollector::new(config);
705 metrics.record_request("Version", "10.0.0.1:50000", true, Duration::from_millis(10));
706
707 let output = metrics.to_prometheus_text();
708 assert!(output.contains("status=\"success\""));
709 assert!(!output.contains("method=\"Version\""));
710 assert!(!output.contains("endpoint="));
711 }
712
713 #[test]
714 fn test_uptime_increases() {
715 let metrics = MetricsCollector::with_defaults();
716 let uptime1 = metrics.uptime();
717 std::thread::sleep(Duration::from_millis(10));
718 let uptime2 = metrics.uptime();
719 assert!(uptime2 > uptime1);
720 }
721}