api_tools/server/axum/layers/
prometheus.rs1use axum::body::Body;
4use axum::extract::MatchedPath;
5use axum::http::Request;
6use axum::response::Response;
7use bytesize::ByteSize;
8use futures::future::BoxFuture;
9use metrics::{counter, gauge, histogram};
10use std::fmt;
11use std::path::PathBuf;
12use std::task::{Context, Poll};
13use std::time::Instant;
14use sysinfo::{Disks, System};
15use tower::{Layer, Service};
16
17#[derive(Clone)]
19pub struct PrometheusLayer {
20 pub service_name: String,
22
23 pub disk_mount_points: Vec<PathBuf>,
25}
26
27impl<S> Layer<S> for PrometheusLayer {
28 type Service = PrometheusMiddleware<S>;
29
30 fn layer(&self, inner: S) -> Self::Service {
31 PrometheusMiddleware {
32 inner,
33 service_name: self.service_name.clone(),
34 disk_mount_points: self.disk_mount_points.clone(),
35 }
36 }
37}
38
39#[derive(Clone)]
40pub struct PrometheusMiddleware<S> {
41 inner: S,
42 service_name: String,
43 disk_mount_points: Vec<PathBuf>,
44}
45
46impl<S> Service<Request<Body>> for PrometheusMiddleware<S>
47where
48 S: Service<Request<Body>, Response = Response> + Send + 'static,
49 S::Future: Send + 'static,
50{
51 type Response = S::Response;
52 type Error = S::Error;
53 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
55
56 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 self.inner.poll_ready(cx)
58 }
59
60 fn call(&mut self, request: Request<Body>) -> Self::Future {
61 let path = if let Some(matched_path) = request.extensions().get::<MatchedPath>() {
62 matched_path.as_str().to_owned()
63 } else {
64 request.uri().path().to_owned()
65 };
66 let method = request.method().to_string();
67 let service_name = self.service_name.clone();
68 let disk_mount_points = self.disk_mount_points.clone();
69
70 let start = Instant::now();
71 let future = self.inner.call(request);
72 Box::pin(async move {
73 let response = future.await?;
74
75 if path != "/metrics" {
77 let latency = start.elapsed().as_secs_f64();
78 let status = response.status().as_u16().to_string();
79 let labels = [
80 ("method", method),
81 ("path", path),
82 ("service", service_name.clone()),
83 ("status", status),
84 ];
85
86 counter!("http_requests_total", &labels).increment(1);
87 histogram!("http_requests_duration_seconds", &labels).record(latency);
88 }
89
90 let system_metrics = SystemMetrics::new(&disk_mount_points).await;
92 system_metrics.add_metrics(service_name);
93
94 Ok(response)
95 })
96 }
97}
98
99#[derive(Debug, Clone)]
100struct SystemMetrics {
101 cpu_usage: f32,
103
104 total_memory: u64,
106
107 used_memory: u64,
109
110 total_swap: u64,
112
113 used_swap: u64,
115
116 total_disks_space: u64,
118
119 used_disks_space: u64,
121}
122
123impl SystemMetrics {
124 async fn new(disk_mount_points: &[PathBuf]) -> Self {
126 let mut sys = System::new_all();
127
128 sys.refresh_cpu_usage();
130 let mut cpu_usage = sys.global_cpu_usage();
131 tokio::time::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL).await;
132 sys.refresh_cpu_usage();
133 cpu_usage += sys.global_cpu_usage();
134 cpu_usage /= 2.0;
135
136 sys.refresh_memory();
138 let total_memory = sys.total_memory();
139 let used_memory = sys.used_memory();
140
141 let total_swap = sys.total_swap();
143 let used_swap = sys.used_swap();
144
145 let disks = Disks::new_with_refreshed_list();
147 let mut total_disks_space = 0;
148 let mut used_disks_space = 0;
149 for disk in &disks {
150 if disk_mount_points.contains(&disk.mount_point().to_path_buf()) {
151 total_disks_space += disk.total_space();
152 used_disks_space += disk.total_space() - disk.available_space();
153 }
154 }
155
156 Self {
157 cpu_usage,
158 total_memory,
159 used_memory,
160 total_swap,
161 used_swap,
162 total_disks_space,
163 used_disks_space,
164 }
165 }
166
167 fn add_metrics(&self, service_name: String) {
169 gauge!("system_cpu_usage", "service" => service_name.clone()).set(self.cpu_usage);
170 gauge!("system_total_memory", "service" => service_name.clone()).set(self.total_memory as f64);
171 gauge!("system_used_memory", "service" => service_name.clone()).set(self.used_memory as f64);
172 gauge!("system_total_swap", "service" => service_name.clone()).set(self.total_swap as f64);
173 gauge!("system_used_swap", "service" => service_name.clone()).set(self.used_swap as f64);
174 gauge!("system_total_disks_space", "service" => service_name.clone()).set(self.total_disks_space as f64);
175 gauge!("system_used_disks_usage", "service" => service_name).set(self.used_disks_space as f64);
176 }
177}
178
179impl fmt::Display for SystemMetrics {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 write!(
182 f,
183 "CPUs: {:.1}%\n\
184 Memory: {} / {}\n\
185 Swap: {} / {}\n\
186 Disk usage: {} / {}",
187 self.cpu_usage,
188 ByteSize::b(self.used_memory),
189 ByteSize::b(self.total_memory),
190 ByteSize::b(self.used_swap),
191 ByteSize::b(self.total_swap),
192 ByteSize::b(self.used_disks_space),
193 ByteSize::b(self.total_disks_space),
194 )
195 }
196}