1pub use metrics;
26use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, Recorder, Unit};
27pub use metrics_exporter_prometheus;
29pub use metrics_util;
31
32use actix_web::{HttpResponse, Responder, Scope, web};
33use anyhow::Result;
34use log::debug;
35use log_once::debug_once;
36use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
37use metrics_util::layers::FanoutBuilder;
38use mime_guess::from_path;
39use rust_embed::Embed;
40use std::{
41 collections::HashMap,
42 sync::{
43 Arc, Mutex, OnceLock,
44 atomic::{AtomicBool, Ordering},
45 },
46 time::{Duration, Instant},
47};
48
49static IS_CONFIGURED: AtomicBool = AtomicBool::new(false);
51
52static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
54
55static UNITS_FOR_METRICS: OnceLock<Mutex<HashMap<String, String>>> = OnceLock::new();
60
61static RATE_TRACKERS: OnceLock<Mutex<HashMap<String, RateTracker>>> = OnceLock::new();
65
66#[derive(Embed)]
68#[folder = "public/"]
69struct Asset;
70
71#[derive(Debug, Clone)]
77pub struct RateTracker {
78 samples: Vec<(f64, Instant)>,
79 window_duration: Duration,
80 max_samples: usize,
81 last_value: f64, start_time: Option<Instant>, last_calculated_rate: f64, last_update_time: Option<Instant>, }
86
87impl Default for RateTracker {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl RateTracker {
94 pub fn new() -> Self {
96 Self {
97 samples: Vec::new(),
98 window_duration: Duration::from_secs(10), max_samples: 200, last_value: 0.0,
101 start_time: None,
102 last_calculated_rate: 0.0,
103 last_update_time: None,
104 }
105 }
106
107 pub fn update(&mut self, new_value: f64) -> f64 {
115 let now = Instant::now();
116
117 if self.start_time.is_none() {
119 self.start_time = Some(now);
120 self.last_update_time = Some(now);
121 self.last_value = new_value;
122 self.samples.push((new_value, now));
123 return 0.0; }
125
126 let short_term_rate = if let Some(last_time) = self.last_update_time {
128 let elapsed = now.duration_since(last_time).as_secs_f64();
129 if elapsed > 0.001 && new_value > self.last_value {
131 let instant_rate = (new_value - self.last_value) / elapsed;
132 if self.last_calculated_rate > 0.0 {
134 0.3 * instant_rate + 0.7 * self.last_calculated_rate
136 } else {
137 instant_rate
138 }
139 } else {
140 self.last_calculated_rate
142 }
143 } else {
144 0.0
145 };
146
147 if new_value == self.last_value {
149 return short_term_rate;
150 }
151
152 if new_value < self.last_value {
154 self.samples.clear();
156 self.start_time = Some(now);
157 self.last_value = new_value;
158 self.last_update_time = Some(now);
159 self.samples.push((new_value, now));
160 self.last_calculated_rate = 0.0;
161 return 0.0;
162 }
163
164 self.samples.push((new_value, now));
166 self.last_value = new_value;
167 self.last_update_time = Some(now);
168
169 let cutoff = now - self.window_duration;
171 self.samples.retain(|(_, timestamp)| *timestamp > cutoff);
172
173 if self.samples.len() > self.max_samples {
175 let excess = self.samples.len() - self.max_samples;
176 self.samples.drain(0..excess);
177 }
178
179 if self.samples.len() < 2 {
181 if let Some(start) = self.start_time {
182 let elapsed = now.duration_since(start).as_secs_f64();
183 if elapsed > 0.0 {
184 let first_value = self.samples[0].0;
186 let rate = (new_value - first_value) / elapsed;
187 self.last_calculated_rate = rate.max(0.0);
188 return self.last_calculated_rate;
189 }
190 }
191 return short_term_rate;
192 }
193
194 let (first_value, first_time) = self.samples[0];
196 let (last_value, last_time) = self.samples[self.samples.len() - 1];
197
198 let time_diff = last_time.duration_since(first_time).as_secs_f64();
199
200 if time_diff <= 0.0 {
201 return short_term_rate;
202 }
203
204 let value_diff = last_value - first_value;
205
206 let long_term_rate = (value_diff / time_diff).max(0.0);
208
209 let rate = if (long_term_rate - short_term_rate).abs() > long_term_rate * 0.5 {
212 long_term_rate
213 } else {
214 0.7 * long_term_rate + 0.3 * short_term_rate
216 };
217
218 self.last_calculated_rate = rate;
219 rate
220 }
221}
222
223#[derive(Debug, Clone, Default)]
225pub struct DashboardInput<'a> {
226 pub buckets_for_metrics: Vec<(Matcher, &'a [f64])>,
243}
244
245#[derive(Debug)]
257struct UnitRecorder;
258
259#[derive(Clone, Debug)]
264#[allow(dead_code)]
265struct UnitRecorderHandle(Key);
266
267impl CounterFn for UnitRecorderHandle {
268 fn increment(&self, _value: u64) {
269 }
271
272 fn absolute(&self, _value: u64) {
273 }
275}
276
277impl GaugeFn for UnitRecorderHandle {
278 fn increment(&self, _value: f64) {
279 }
281
282 fn decrement(&self, _value: f64) {
283 }
285
286 fn set(&self, _value: f64) {
287 }
289}
290
291impl HistogramFn for UnitRecorderHandle {
292 fn record(&self, _value: f64) {
293 }
295}
296
297impl Recorder for UnitRecorder {
298 fn describe_counter(
299 &self,
300 key: metrics::KeyName,
301 unit: Option<metrics::Unit>,
302 _description: metrics::SharedString,
303 ) {
304 self.register_unit(key, unit);
305 }
306
307 fn describe_gauge(
308 &self,
309 key: metrics::KeyName,
310 unit: Option<metrics::Unit>,
311 _description: metrics::SharedString,
312 ) {
313 self.register_unit(key, unit);
314 }
315
316 fn describe_histogram(
317 &self,
318 key: metrics::KeyName,
319 unit: Option<metrics::Unit>,
320 _description: metrics::SharedString,
321 ) {
322 self.register_unit(key, unit);
323 }
324
325 fn register_counter(
326 &self,
327 key: &metrics::Key,
328 _metadata: &metrics::Metadata<'_>,
329 ) -> metrics::Counter {
330 Counter::from_arc(Arc::new(UnitRecorderHandle(key.clone())))
331 }
332
333 fn register_gauge(
334 &self,
335 key: &metrics::Key,
336 _metadata: &metrics::Metadata<'_>,
337 ) -> metrics::Gauge {
338 Gauge::from_arc(Arc::new(UnitRecorderHandle(key.clone())))
339 }
340
341 fn register_histogram(
342 &self,
343 key: &metrics::Key,
344 _metadata: &metrics::Metadata<'_>,
345 ) -> metrics::Histogram {
346 Histogram::from_arc(Arc::new(UnitRecorderHandle(key.clone())))
347 }
348}
349
350impl UnitRecorder {
351 fn register_unit(&self, key: metrics::KeyName, unit: Option<metrics::Unit>) {
361 let key = key.as_str().to_owned();
362 let unit = unit.unwrap_or(Unit::Count);
363 let unit = unit.as_str().to_owned();
364 let g_unit = UNITS_FOR_METRICS.get_or_init(|| Mutex::new(HashMap::new()));
365 if let Ok(mut locked) = g_unit.lock() {
366 locked.insert(key, unit);
367 }
368 }
369}
370
371fn handle_embedded_file(path: &str) -> HttpResponse {
386 match Asset::get(path) {
387 Some(content) => HttpResponse::Ok()
388 .content_type(from_path(path).first_or_octet_stream().as_ref())
389 .body(content.data.into_owned()),
390 None => HttpResponse::NotFound().body("404 Not Found"),
391 }
392}
393
394#[actix_web::get("/dashboard")]
404async fn get_dashboard() -> impl Responder {
405 handle_embedded_file("index.html")
406}
407
408#[actix_web::get("/dashboard/{_:.*}")]
422async fn get_dashboard_assets(path: web::Path<String>) -> impl Responder {
423 handle_embedded_file(path.as_str())
424}
425
426#[actix_web::get("/prometheus")]
439async fn get_prometheus_metrics() -> impl Responder {
440 debug!("Gathering prometheus metrics...");
441 let prometheus_handle = PROMETHEUS_HANDLE.get();
442 let metrics_units = UNITS_FOR_METRICS.get();
443 let mut response = HttpResponse::Ok();
444
445 if let Some(metrics_units) = metrics_units {
446 let header = serde_json::to_string(metrics_units).unwrap_or_default();
447 response.append_header(("x-dashboard-metrics-unit", header));
448 }
449
450 if let Some(handle) = prometheus_handle {
451 let metrics = handle.render();
452 return response.body(metrics);
453 }
454
455 HttpResponse::Ok().body(String::from(""))
456}
457
458fn configure_metrics_recorders_once(input: &DashboardInput) -> Result<()> {
486 if IS_CONFIGURED.load(Ordering::Acquire) {
489 debug_once!("Metrics recorder already configured. Skipping duplicate configuration.");
490 return Ok(());
491 }
492
493 if IS_CONFIGURED
495 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
496 .is_err()
497 {
498 debug_once!("Another thread configured metrics. Skipping duplicate configuration.");
500 return Ok(());
501 }
502
503 let mut prometheus_recorder = PrometheusBuilder::new();
504
505 if !input.buckets_for_metrics.is_empty() {
506 for (matcher, buckets) in input.buckets_for_metrics.iter() {
507 prometheus_recorder = prometheus_recorder
508 .set_buckets_for_metric(matcher.to_owned(), buckets)
509 .map_err(|e| anyhow::anyhow!("Failed to set buckets for metric: {}", e))?;
510 }
511 }
512
513 let prometheus_recorder = prometheus_recorder
514 .set_enable_unit_suffix(false)
515 .build_recorder();
516
517 PROMETHEUS_HANDLE
518 .set(prometheus_recorder.handle())
519 .map_err(|e| anyhow::anyhow!("Unable to set Prometheus handle: {}", e.render()))?;
520
521 let fanout = FanoutBuilder::default()
522 .add_recorder(UnitRecorder)
523 .add_recorder(prometheus_recorder)
524 .build();
525
526 tokio::spawn(async move {
527 let handle = PROMETHEUS_HANDLE.get();
528
529 if let Some(handle) = handle {
530 loop {
531 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
532 handle.run_upkeep();
533 }
534 } else {
535 debug!("Prometheus handle not set. Skipping recorder cleanup.");
536 }
537 });
538
539 metrics::set_global_recorder(fanout).map_err(|e| {
540 anyhow::anyhow!(
541 "Unable to register a recorder: {}. Did you call this function multiple times?",
542 e
543 )
544 })?;
545
546 Ok(())
547}
548
549pub fn update_rate_tracker(_counter_name: &str, value: f64, tracker_key: String) -> f64 {
554 let rate_trackers = RATE_TRACKERS.get_or_init(|| Mutex::new(HashMap::new()));
555 if let Ok(mut trackers) = rate_trackers.lock() {
556 let tracker = trackers
557 .entry(tracker_key.clone())
558 .or_insert_with(RateTracker::new);
559
560 tracker.update(value)
563 } else {
564 static LAST_VALUES: OnceLock<Mutex<HashMap<String, (f64, Instant)>>> = OnceLock::new();
567 let last_values = LAST_VALUES.get_or_init(|| Mutex::new(HashMap::new()));
568
569 if let Ok(mut values) = last_values.lock() {
570 let now = Instant::now();
571 let entry = values.entry(tracker_key).or_insert((0.0, now));
572
573 let (last_value, last_time) = *entry;
574 let elapsed = now.duration_since(last_time).as_secs_f64();
575
576 if elapsed > 0.0 && value > last_value {
577 let rate = (value - last_value) / elapsed;
578 *entry = (value, now);
579 return rate;
580 }
581
582 *entry = (value, now);
584 }
585
586 0.001 }
589}
590
591#[macro_export]
607macro_rules! counter_with_rate {
608 ($name:expr, $value:expr) => {{
609 use $crate::update_rate_tracker;
610
611 let counter = metrics::counter!($name);
613 counter.increment($value as u64);
614
615 use std::sync::OnceLock;
618 use std::sync::Mutex;
619 use std::collections::HashMap;
620
621 static COUNTER_VALUES: OnceLock<Mutex<HashMap<String, f64>>> = OnceLock::new();
622 let counter_values = COUNTER_VALUES.get_or_init(|| Mutex::new(HashMap::new()));
623
624 let absolute_value = if let Ok(mut values) = counter_values.lock() {
625 let key = format!("{}_default", $name);
626 let current = values.entry(key).or_insert(0.0);
627 *current += $value;
628 *current
629 } else {
630 $value
632 };
633
634 let rate_name = format!("{}_rate_per_sec", $name);
636 let tracker_key = format!("{}_default", $name);
637 let rate = update_rate_tracker($name, absolute_value, tracker_key);
638
639 let display_rate = if rate < 0.001 && $value > 0.0 { 0.001 } else { rate };
641 metrics::gauge!(rate_name).set(display_rate);
642 }};
643 ($name:expr, $value:expr, $label_key:expr, $label_value:expr) => {{
644 use $crate::update_rate_tracker;
645
646 let counter = metrics::counter!($name, $label_key => $label_value);
648 counter.increment($value as u64);
649
650 use std::sync::OnceLock;
652 use std::sync::Mutex;
653 use std::collections::HashMap;
654
655 static COUNTER_VALUES: OnceLock<Mutex<HashMap<String, f64>>> = OnceLock::new();
656 let counter_values = COUNTER_VALUES.get_or_init(|| Mutex::new(HashMap::new()));
657
658 let absolute_value = if let Ok(mut values) = counter_values.lock() {
659 let key = format!("{}_{}_{}", $name, $label_key, $label_value);
660 let current = values.entry(key).or_insert(0.0);
661 *current += $value;
662 *current
663 } else {
664 $value
666 };
667
668 let rate_name = format!("{}_rate_per_sec", $name);
670 let tracker_key = format!("{}_{}_{}", $name, $label_key, $label_value);
671 let rate = update_rate_tracker($name, absolute_value, tracker_key);
672
673 let display_rate = if rate < 0.001 && $value > 0.0 { 0.001 } else { rate };
675 metrics::gauge!(rate_name, $label_key => $label_value).set(display_rate);
676 }};
677}
678
679#[macro_export]
695macro_rules! absolute_counter_with_rate {
696 ($name:expr, $value:expr) => {{
697 use $crate::update_rate_tracker;
698
699 metrics::counter!($name).absolute($value as u64);
701
702 let rate_name = format!("{}_rate_per_sec", $name);
704 let tracker_key = format!("{}_default", $name);
705 let rate = update_rate_tracker($name, $value, tracker_key);
706
707 let display_rate = if rate < 0.001 && $value > 0.0 { 0.001 } else { rate };
709 metrics::gauge!(rate_name).set(display_rate);
710 }};
711 ($name:expr, $value:expr, $label_key:expr, $label_value:expr) => {{
712 use $crate::update_rate_tracker;
713
714 metrics::counter!($name, $label_key => $label_value).absolute($value as u64);
716
717 let rate_name = format!("{}_rate_per_sec", $name);
719 let tracker_key = format!("{}_{}_{}", $name, $label_key, $label_value);
720 let rate = update_rate_tracker($name, $value, tracker_key);
721
722 let display_rate = if rate < 0.001 && $value > 0.0 { 0.001 } else { rate };
724 metrics::gauge!(rate_name, $label_key => $label_value).set(display_rate);
725 }};
726}
727
728pub fn create_metrics_actx_scope(input: &DashboardInput) -> Result<Scope> {
767 configure_metrics_recorders_once(input)?;
768 let scope = web::scope("/metrics")
769 .service(get_prometheus_metrics)
770 .service(get_dashboard)
771 .service(get_dashboard_assets);
772 Ok(scope)
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use std::thread;
779 use std::time::Duration;
780
781 #[test]
782 fn test_rate_tracker_new() {
783 let tracker = RateTracker::new();
784 assert!(tracker.samples.is_empty());
785 assert_eq!(tracker.window_duration, Duration::from_secs(2));
786 assert_eq!(tracker.max_samples, 200);
787 }
788
789 #[test]
790 fn test_rate_tracker_default() {
791 let tracker = RateTracker::default();
792 assert!(tracker.samples.is_empty());
793 assert_eq!(tracker.window_duration, Duration::from_secs(2));
794 assert_eq!(tracker.max_samples, 200);
795 }
796
797 #[test]
798 fn test_rate_tracker_first_update() {
799 let mut tracker = RateTracker::new();
800
801 let rate = tracker.update(10.0);
802
803 assert_eq!(rate, 0.0);
805 assert_eq!(tracker.samples.len(), 1);
806 assert_eq!(tracker.samples[0].0, 10.0);
807 }
808
809 #[test]
810 fn test_rate_tracker_subsequent_updates() {
811 let mut tracker = RateTracker::new();
812
813 tracker.update(10.0);
815
816 thread::sleep(Duration::from_millis(20));
818
819 let rate = tracker.update(20.0);
821
822 assert!(rate > 0.0);
824 assert!(rate > 100.0); assert_eq!(tracker.samples.len(), 2);
826 }
827
828 #[test]
829 fn test_rate_tracker_negative_rate_clamping() {
830 let mut tracker = RateTracker::new();
831
832 tracker.update(20.0);
834
835 thread::sleep(Duration::from_millis(20));
836
837 let rate = tracker.update(10.0);
839
840 assert_eq!(rate, 0.0);
842 assert_eq!(tracker.samples.len(), 2);
843 assert_eq!(tracker.samples[1].0, 10.0);
844 }
845
846 #[test]
847 fn test_rate_tracker_high_frequency_updates() {
848 let mut tracker = RateTracker::new();
849
850 tracker.update(10.0);
852
853 let rate = tracker.update(20.0);
855
856 assert!(rate >= 0.0);
858 assert_eq!(tracker.samples.len(), 2);
859 assert_eq!(tracker.samples[1].0, 20.0);
860 }
861
862 #[test]
863 fn test_update_rate_tracker_function() {
864 let tracker_key = "test_metric_default".to_string();
865
866 let rate1 = update_rate_tracker("test_metric", 10.0, tracker_key.clone());
868 assert_eq!(rate1, 0.0); thread::sleep(Duration::from_millis(200));
871
872 let rate2 = update_rate_tracker("test_metric", 20.0, tracker_key);
874 assert!(rate2 >= 0.0); }
876
877 #[test]
878 fn test_counter_with_rate_macro_simple() {
879 let result = std::panic::catch_unwind(|| {
882 counter_with_rate!("test_counter", 1.0);
883 });
884
885 assert!(result.is_ok());
888 }
889
890 #[test]
891 fn test_counter_with_rate_macro_with_labels() {
892 let result = std::panic::catch_unwind(|| {
894 counter_with_rate!("test_counter_labeled", 2.0, "service", "api");
895 });
896
897 assert!(result.is_ok());
898 }
899
900 #[test]
901 fn test_absolute_counter_with_rate_macro_simple() {
902 let result = std::panic::catch_unwind(|| {
904 absolute_counter_with_rate!("test_absolute_counter", 42.0);
905 });
906
907 assert!(result.is_ok());
908 }
909
910 #[test]
911 fn test_absolute_counter_with_rate_macro_with_labels() {
912 let result = std::panic::catch_unwind(|| {
914 absolute_counter_with_rate!("test_absolute_counter_labeled", 100.0, "type", "batch");
915 });
916
917 assert!(result.is_ok());
918 }
919
920 #[test]
921 fn test_rate_calculation_accuracy() {
922 let mut tracker = RateTracker::new();
923
924 tracker.update(0.0);
926
927 thread::sleep(Duration::from_secs(1));
929
930 let rate = tracker.update(10.0);
932
933 assert!(
935 (rate - 10.0).abs() < 1.0,
936 "Rate {} should be close to 10.0",
937 rate
938 );
939 }
940
941 #[test]
942 fn test_multiple_rate_tracker_instances() {
943 let key1 = "metric1_default".to_string();
944 let key2 = "metric2_default".to_string();
945
946 update_rate_tracker("metric1", 10.0, key1.clone());
948 update_rate_tracker("metric2", 20.0, key2.clone());
949
950 thread::sleep(Duration::from_millis(200));
951
952 let rate1 = update_rate_tracker("metric1", 15.0, key1);
953 let rate2 = update_rate_tracker("metric2", 30.0, key2);
954
955 assert!(rate1 >= 0.0);
957 assert!(rate2 >= 0.0);
958
959 if rate1 > 0.0 && rate2 > 0.0 {
962 assert!(
963 (rate2 / rate1 - 2.0).abs() < 0.5,
964 "Rate2 ({}) should be approximately twice rate1 ({})",
965 rate2,
966 rate1
967 );
968 }
969 }
970
971 #[test]
972 fn test_dashboard_input_default() {
973 let input = DashboardInput::default();
974 assert!(input.buckets_for_metrics.is_empty());
975 }
976
977 #[test]
978 fn test_dashboard_input_with_buckets() {
979 let buckets = &[1.0, 5.0, 10.0];
980 let input = DashboardInput {
981 buckets_for_metrics: vec![(
982 metrics_exporter_prometheus::Matcher::Full("test_metric".to_string()),
983 buckets,
984 )],
985 };
986
987 assert_eq!(input.buckets_for_metrics.len(), 1);
988 assert_eq!(input.buckets_for_metrics[0].1, buckets);
989 }
990
991 #[test]
992 fn test_rate_tracker_zero_value_update() {
993 let mut tracker = RateTracker::new();
994
995 thread::sleep(Duration::from_millis(150));
996
997 let rate = tracker.update(0.0);
999
1000 assert_eq!(rate, 0.0);
1002 assert_eq!(tracker.samples.len(), 1);
1003 assert_eq!(tracker.samples[0].0, 0.0);
1004 }
1005
1006 #[test]
1007 fn test_rate_tracker_large_values() {
1008 let mut tracker = RateTracker::new();
1009
1010 tracker.update(500_000.0);
1012
1013 thread::sleep(Duration::from_millis(20));
1014
1015 let large_value = 1_000_000.0;
1017 let rate = tracker.update(large_value);
1018
1019 assert!(rate > 0.0);
1020 assert_eq!(tracker.samples.len(), 2);
1021 assert_eq!(tracker.samples[1].0, large_value);
1022 }
1023
1024 #[test]
1025 fn test_rate_tracker_fractional_values() {
1026 let mut tracker = RateTracker::new();
1027
1028 tracker.update(1.5);
1030
1031 thread::sleep(Duration::from_millis(20));
1032
1033 let rate = tracker.update(3.7);
1035
1036 assert!(rate > 0.0);
1038 assert_eq!(tracker.samples.len(), 2);
1039 assert_eq!(tracker.samples[1].0, 3.7);
1040 }
1041
1042 #[test]
1043 fn test_update_rate_tracker_concurrent_access() {
1044 use std::thread;
1045
1046 let handles: Vec<_> = (0..5)
1047 .map(|i| {
1048 thread::spawn(move || {
1049 let tracker_key = format!("concurrent_test_{}", i);
1050
1051 update_rate_tracker("concurrent_metric", 10.0, tracker_key.clone());
1053
1054 thread::sleep(Duration::from_millis(200));
1055
1056 update_rate_tracker("concurrent_metric", 20.0, tracker_key)
1057 })
1058 })
1059 .collect();
1060
1061 for handle in handles {
1063 let rate = handle.join().expect("Thread should complete successfully");
1064 assert!(rate >= 0.0);
1065 }
1066 }
1067
1068 #[test]
1069 fn test_rate_tracker_consistent_timestamps() {
1070 let mut tracker = RateTracker::new();
1071
1072 let start_time = std::time::Instant::now();
1073
1074 thread::sleep(Duration::from_millis(20));
1075
1076 tracker.update(5.0);
1077
1078 assert_eq!(tracker.samples.len(), 1);
1080 assert!(tracker.samples[0].1 > start_time);
1081 }
1082}