api_tools/server/axum/layers/
prometheus.rs

1//! Prometheus' metrics layer
2
3use axum::body::Body;
4use axum::extract::MatchedPath;
5use axum::http::Request;
6use axum::response::Response;
7use bytesize::ByteSize;
8use futures::future::BoxFuture;
9use metrics::{counter, gauge, histogram};
10use std::fmt;
11use std::path::PathBuf;
12use std::task::{Context, Poll};
13use std::time::Instant;
14use sysinfo::{Disks, System};
15use tower::{Layer, Service};
16
17/// Prometheus metrics layer for Axum
18#[derive(Clone)]
19pub struct PrometheusLayer {
20    /// Service name
21    pub service_name: String,
22
23    /// Disk mount points to monitor
24    pub disk_mount_points: Vec<PathBuf>,
25}
26
27impl<S> Layer<S> for PrometheusLayer {
28    type Service = PrometheusMiddleware<S>;
29
30    fn layer(&self, inner: S) -> Self::Service {
31        PrometheusMiddleware {
32            inner,
33            service_name: self.service_name.clone(),
34            disk_mount_points: self.disk_mount_points.clone(),
35        }
36    }
37}
38
39#[derive(Clone)]
40pub struct PrometheusMiddleware<S> {
41    inner: S,
42    service_name: String,
43    disk_mount_points: Vec<PathBuf>,
44}
45
46impl<S> Service<Request<Body>> for PrometheusMiddleware<S>
47where
48    S: Service<Request<Body>, Response = Response> + Send + 'static,
49    S::Future: Send + 'static,
50{
51    type Response = S::Response;
52    type Error = S::Error;
53    // `BoxFuture` is a type alias for `Pin<Box<dyn Future + Send + 'a>>`
54    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
55
56    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        self.inner.poll_ready(cx)
58    }
59
60    fn call(&mut self, request: Request<Body>) -> Self::Future {
61        let path = if let Some(matched_path) = request.extensions().get::<MatchedPath>() {
62            matched_path.as_str().to_owned()
63        } else {
64            request.uri().path().to_owned()
65        };
66        let method = request.method().to_string();
67        let service_name = self.service_name.clone();
68        let disk_mount_points = self.disk_mount_points.clone();
69
70        let start = Instant::now();
71        let future = self.inner.call(request);
72        Box::pin(async move {
73            let response = future.await?;
74
75            // Exclude metrics endpoint
76            if path != "/metrics" {
77                let latency = start.elapsed().as_secs_f64();
78                let status = response.status().as_u16().to_string();
79                let labels = [
80                    ("method", method),
81                    ("path", path),
82                    ("service", service_name.clone()),
83                    ("status", status),
84                ];
85
86                counter!("http_requests_total", &labels).increment(1);
87                histogram!("http_requests_duration_seconds", &labels).record(latency);
88            }
89
90            // System metrics
91            let system_metrics = SystemMetrics::new(&disk_mount_points).await;
92            system_metrics.add_metrics(service_name);
93
94            Ok(response)
95        })
96    }
97}
98
99#[derive(Debug, Clone)]
100struct SystemMetrics {
101    /// Average CPU usage in percent
102    cpu_usage: f32,
103
104    /// Total memory in bytes
105    total_memory: u64,
106
107    /// Used memory in bytes
108    used_memory: u64,
109
110    /// Total swap space in bytes
111    total_swap: u64,
112
113    /// Used swap space in bytes
114    used_swap: u64,
115
116    /// Total disk space in bytes for a specified mount point
117    total_disks_space: u64,
118
119    /// Used disk space in bytes for a specified mount point
120    used_disks_space: u64,
121}
122
123impl SystemMetrics {
124    /// Creates a new `SystemMetrics` instance, refreshing the system information
125    async fn new(disk_mount_points: &[PathBuf]) -> Self {
126        let mut sys = System::new_all();
127
128        // CPU
129        sys.refresh_cpu_usage();
130        let mut cpu_usage = sys.global_cpu_usage();
131        tokio::time::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL).await;
132        sys.refresh_cpu_usage();
133        cpu_usage += sys.global_cpu_usage();
134        cpu_usage /= 2.0;
135
136        // Memory
137        sys.refresh_memory();
138        let total_memory = sys.total_memory();
139        let used_memory = sys.used_memory();
140
141        // Swap
142        let total_swap = sys.total_swap();
143        let used_swap = sys.used_swap();
144
145        // Disks
146        let disks = Disks::new_with_refreshed_list();
147        let mut total_disks_space = 0;
148        let mut used_disks_space = 0;
149        for disk in &disks {
150            if disk_mount_points.contains(&disk.mount_point().to_path_buf()) {
151                total_disks_space += disk.total_space();
152                used_disks_space += disk.total_space() - disk.available_space();
153            }
154        }
155
156        Self {
157            cpu_usage,
158            total_memory,
159            used_memory,
160            total_swap,
161            used_swap,
162            total_disks_space,
163            used_disks_space,
164        }
165    }
166
167    /// Adds the system metrics to Prometheus gauges
168    fn add_metrics(&self, service_name: String) {
169        gauge!("system_cpu_usage", "service" => service_name.clone()).set(self.cpu_usage);
170        gauge!("system_total_memory", "service" => service_name.clone()).set(self.total_memory as f64);
171        gauge!("system_used_memory", "service" => service_name.clone()).set(self.used_memory as f64);
172        gauge!("system_total_swap", "service" => service_name.clone()).set(self.total_swap as f64);
173        gauge!("system_used_swap", "service" => service_name.clone()).set(self.used_swap as f64);
174        gauge!("system_total_disks_space", "service" => service_name.clone()).set(self.total_disks_space as f64);
175        gauge!("system_used_disks_usage", "service" => service_name).set(self.used_disks_space as f64);
176    }
177}
178
179impl fmt::Display for SystemMetrics {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        write!(
182            f,
183            "CPUs:       {:.1}%\n\
184             Memory:     {} / {}\n\
185             Swap:       {} / {}\n\
186             Disk usage: {} / {}",
187            self.cpu_usage,
188            ByteSize::b(self.used_memory),
189            ByteSize::b(self.total_memory),
190            ByteSize::b(self.used_swap),
191            ByteSize::b(self.total_swap),
192            ByteSize::b(self.used_disks_space),
193            ByteSize::b(self.total_disks_space),
194        )
195    }
196}