1use std::collections::HashMap;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::application::Application;
28use crate::middleware::Middleware;
29use crate::request::Request;
30use crate::response::Response;
31use crate::server::ConnectionInfo;
32
33pub static SERVER_READY: AtomicBool = AtomicBool::new(false);
40
41pub static REQUESTS_TOTAL: AtomicU64 = AtomicU64::new(0);
43
44pub static ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
46
47pub static ACTIVE_CONNECTIONS: AtomicI64 = AtomicI64::new(0);
49
50pub fn record_request() {
51 REQUESTS_TOTAL.fetch_add(1, Ordering::Relaxed);
52}
53
54pub fn record_error() {
55 ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
56}
57
58pub fn connection_open() {
59 ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
60}
61
62pub fn connection_close() {
63 ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
64}
65
66const BUCKET_BOUNDS: [f64; 11] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
71
72struct HistogramEntry {
73 buckets: [u64; 11],
75 sum: f64,
76 count: u64,
77}
78
79impl HistogramEntry {
80 fn new() -> Self {
81 HistogramEntry { buckets: [0; 11], sum: 0.0, count: 0 }
82 }
83
84 fn observe(&mut self, secs: f64) {
85 for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
86 if secs <= upper {
87 self.buckets[i] += 1;
88 }
89 }
90 self.sum += secs;
91 self.count += 1;
92 }
93}
94
95struct RouteEntry {
96 counts: HashMap<i16, u64>,
97 latency: HistogramEntry,
98}
99
100impl RouteEntry {
101 fn new() -> Self {
102 RouteEntry { counts: HashMap::new(), latency: HistogramEntry::new() }
103 }
104}
105
106struct RouteStore {
107 entries: HashMap<(String, String), RouteEntry>,
109}
110
111static ROUTE_STORE: OnceLock<Mutex<RouteStore>> = OnceLock::new();
112
113fn route_store() -> &'static Mutex<RouteStore> {
114 ROUTE_STORE.get_or_init(|| Mutex::new(RouteStore { entries: HashMap::new() }))
115}
116
117pub fn record_route(method: &str, path: &str, status: i16, elapsed_secs: f64) {
122 let key = (method.to_string(), path.to_string());
123 let mut guard = route_store().lock().unwrap();
124 let entry = guard.entries.entry(key).or_insert_with(RouteEntry::new);
125 *entry.counts.entry(status).or_insert(0) += 1;
126 entry.latency.observe(elapsed_secs);
127}
128
129fn strip_query(uri: &str) -> &str {
131 match uri.find('?') {
132 Some(i) => &uri[..i],
133 None => uri,
134 }
135}
136
137pub struct MetricsLayer;
152
153impl Middleware for MetricsLayer {
154 fn handle(
155 &self,
156 request: &Request,
157 connection: &ConnectionInfo,
158 next: &dyn Application,
159 ) -> Result<Response, String> {
160 let start = Instant::now();
161 let result = next.execute(request, connection);
162 let elapsed = start.elapsed().as_secs_f64();
163
164 let path = strip_query(&request.request_uri).to_string();
165 let status = match &result {
166 Ok(r) => r.status_code,
167 Err(_) => 500,
168 };
169 record_route(&request.method, &path, status, elapsed);
170
171 result
172 }
173}
174
175#[cfg(test)]
178mod tests;
179
180pub fn prometheus_text() -> String {
182 let requests = REQUESTS_TOTAL.load(Ordering::Relaxed);
183 let errors = ERRORS_TOTAL.load(Ordering::Relaxed);
184 let active = ACTIVE_CONNECTIONS.load(Ordering::Relaxed);
185
186 let mut out = format!(
187 "# HELP rws_requests_total Total HTTP requests handled\n\
188 # TYPE rws_requests_total counter\n\
189 rws_requests_total {}\n\n\
190 # HELP rws_errors_total HTTP requests that returned an application error\n\
191 # TYPE rws_errors_total counter\n\
192 rws_errors_total {}\n\n\
193 # HELP rws_active_connections Currently open connections\n\
194 # TYPE rws_active_connections gauge\n\
195 rws_active_connections {}\n",
196 requests, errors, active
197 );
198
199 let route_text = route_prometheus_text();
200 if !route_text.is_empty() {
201 out.push('\n');
202 out.push_str(&route_text);
203 }
204
205 out
206}
207
208fn route_prometheus_text() -> String {
209 let guard = route_store().lock().unwrap();
210 if guard.entries.is_empty() {
211 return String::new();
212 }
213
214 let mut keys: Vec<&(String, String)> = guard.entries.keys().collect();
216 keys.sort();
217
218 let mut out = String::new();
219
220 out.push_str("# HELP rws_route_requests_total Total requests handled per route\n");
222 out.push_str("# TYPE rws_route_requests_total counter\n");
223 for key in &keys {
224 let entry = &guard.entries[key];
225 let (method, path) = key;
226 let mut statuses: Vec<i16> = entry.counts.keys().cloned().collect();
227 statuses.sort();
228 for status in statuses {
229 let count = entry.counts[&status];
230 out.push_str(&format!(
231 "rws_route_requests_total{{method=\"{}\",path=\"{}\",status=\"{}\"}} {}\n",
232 method, path, status, count,
233 ));
234 }
235 }
236
237 out.push('\n');
239 out.push_str("# HELP rws_route_duration_seconds Request duration in seconds per route\n");
240 out.push_str("# TYPE rws_route_duration_seconds histogram\n");
241 for key in &keys {
242 let entry = &guard.entries[key];
243 let (method, path) = key;
244 let lat = &entry.latency;
245
246 for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
247 out.push_str(&format!(
248 "rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"{}\"}} {}\n",
249 method, path, upper, lat.buckets[i],
250 ));
251 }
252 out.push_str(&format!(
253 "rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"+Inf\"}} {}\n",
254 method, path, lat.count,
255 ));
256 out.push_str(&format!(
257 "rws_route_duration_seconds_sum{{method=\"{}\",path=\"{}\"}} {:.9}\n",
258 method, path, lat.sum,
259 ));
260 out.push_str(&format!(
261 "rws_route_duration_seconds_count{{method=\"{}\",path=\"{}\"}} {}\n",
262 method, path, lat.count,
263 ));
264 }
265
266 out
267}