1use std::collections::HashMap;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
24use std::sync::{Mutex, OnceLock};
25use std::time::Instant;
26
27use crate::application::Application;
28use crate::middleware::Middleware;
29use crate::request::Request;
30use crate::response::Response;
31use crate::server::ConnectionInfo;
32
33pub static SERVER_READY: AtomicBool = AtomicBool::new(false);
40
41pub static REQUESTS_TOTAL: AtomicU64 = AtomicU64::new(0);
43
44pub static ERRORS_TOTAL: AtomicU64 = AtomicU64::new(0);
46
47pub static ACTIVE_CONNECTIONS: AtomicI64 = AtomicI64::new(0);
49
50pub static THREAD_POOL_QUEUED: AtomicI64 = AtomicI64::new(0);
52
53pub fn record_request() {
54 REQUESTS_TOTAL.fetch_add(1, Ordering::Relaxed);
55}
56
57pub fn record_error() {
58 ERRORS_TOTAL.fetch_add(1, Ordering::Relaxed);
59}
60
61pub fn connection_open() {
62 ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
63}
64
65pub fn connection_close() {
66 ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
67}
68
69const BUCKET_BOUNDS: [f64; 11] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
74
75struct HistogramEntry {
76 buckets: [u64; 11],
78 sum: f64,
79 count: u64,
80}
81
82impl HistogramEntry {
83 fn new() -> Self {
84 HistogramEntry { buckets: [0; 11], sum: 0.0, count: 0 }
85 }
86
87 fn observe(&mut self, secs: f64) {
88 for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
89 if secs <= upper {
90 self.buckets[i] += 1;
91 }
92 }
93 self.sum += secs;
94 self.count += 1;
95 }
96}
97
98struct RouteEntry {
99 counts: HashMap<i16, u64>,
100 latency: HistogramEntry,
101}
102
103impl RouteEntry {
104 fn new() -> Self {
105 RouteEntry { counts: HashMap::new(), latency: HistogramEntry::new() }
106 }
107}
108
109struct RouteStore {
110 entries: HashMap<(String, String), RouteEntry>,
112}
113
114static ROUTE_STORE: OnceLock<Mutex<RouteStore>> = OnceLock::new();
115
116fn route_store() -> &'static Mutex<RouteStore> {
117 ROUTE_STORE.get_or_init(|| Mutex::new(RouteStore { entries: HashMap::new() }))
118}
119
120pub fn record_route(method: &str, path: &str, status: i16, elapsed_secs: f64) {
125 let key = (method.to_string(), path.to_string());
126 let mut guard = route_store().lock().unwrap();
127 let entry = guard.entries.entry(key).or_insert_with(RouteEntry::new);
128 *entry.counts.entry(status).or_insert(0) += 1;
129 entry.latency.observe(elapsed_secs);
130}
131
132fn strip_query(uri: &str) -> &str {
134 match uri.find('?') {
135 Some(i) => &uri[..i],
136 None => uri,
137 }
138}
139
140pub struct MetricsLayer;
155
156impl Middleware for MetricsLayer {
157 fn handle(
158 &self,
159 request: &Request,
160 connection: &ConnectionInfo,
161 next: &dyn Application,
162 ) -> Result<Response, String> {
163 let start = Instant::now();
164 let result = next.execute(request, connection);
165 let elapsed = start.elapsed().as_secs_f64();
166
167 let path = strip_query(&request.request_uri).to_string();
168 let status = match &result {
169 Ok(r) => r.status_code,
170 Err(_) => 500,
171 };
172 record_route(&request.method, &path, status, elapsed);
173
174 result
175 }
176}
177
178#[cfg(test)]
181mod tests;
182
183pub fn prometheus_text() -> String {
185 let requests = REQUESTS_TOTAL.load(Ordering::Relaxed);
186 let errors = ERRORS_TOTAL.load(Ordering::Relaxed);
187 let active = ACTIVE_CONNECTIONS.load(Ordering::Relaxed);
188
189 let mut out = format!(
190 "# HELP rws_requests_total Total HTTP requests handled\n\
191 # TYPE rws_requests_total counter\n\
192 rws_requests_total {}\n\n\
193 # HELP rws_errors_total HTTP requests that returned an application error\n\
194 # TYPE rws_errors_total counter\n\
195 rws_errors_total {}\n\n\
196 # HELP rws_active_connections Currently open connections\n\
197 # TYPE rws_active_connections gauge\n\
198 rws_active_connections {}\n",
199 requests, errors, active
200 );
201
202 let route_text = route_prometheus_text();
203 if !route_text.is_empty() {
204 out.push('\n');
205 out.push_str(&route_text);
206 }
207
208 out
209}
210
211fn route_prometheus_text() -> String {
212 let guard = route_store().lock().unwrap();
213 if guard.entries.is_empty() {
214 return String::new();
215 }
216
217 let mut keys: Vec<&(String, String)> = guard.entries.keys().collect();
219 keys.sort();
220
221 let mut out = String::new();
222
223 out.push_str("# HELP rws_route_requests_total Total requests handled per route\n");
225 out.push_str("# TYPE rws_route_requests_total counter\n");
226 for key in &keys {
227 let entry = &guard.entries[key];
228 let (method, path) = key;
229 let mut statuses: Vec<i16> = entry.counts.keys().cloned().collect();
230 statuses.sort();
231 for status in statuses {
232 let count = entry.counts[&status];
233 out.push_str(&format!(
234 "rws_route_requests_total{{method=\"{}\",path=\"{}\",status=\"{}\"}} {}\n",
235 method, path, status, count,
236 ));
237 }
238 }
239
240 out.push('\n');
242 out.push_str("# HELP rws_route_duration_seconds Request duration in seconds per route\n");
243 out.push_str("# TYPE rws_route_duration_seconds histogram\n");
244 for key in &keys {
245 let entry = &guard.entries[key];
246 let (method, path) = key;
247 let lat = &entry.latency;
248
249 for (i, &upper) in BUCKET_BOUNDS.iter().enumerate() {
250 out.push_str(&format!(
251 "rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"{}\"}} {}\n",
252 method, path, upper, lat.buckets[i],
253 ));
254 }
255 out.push_str(&format!(
256 "rws_route_duration_seconds_bucket{{method=\"{}\",path=\"{}\",le=\"+Inf\"}} {}\n",
257 method, path, lat.count,
258 ));
259 out.push_str(&format!(
260 "rws_route_duration_seconds_sum{{method=\"{}\",path=\"{}\"}} {:.9}\n",
261 method, path, lat.sum,
262 ));
263 out.push_str(&format!(
264 "rws_route_duration_seconds_count{{method=\"{}\",path=\"{}\"}} {}\n",
265 method, path, lat.count,
266 ));
267 }
268
269 out
270}