1use crate::error::{ForgeError, Result};
9use prometheus::{
10 Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
11};
12use std::sync::Arc;
13use tracing::info;
14
15pub struct ForgeMetrics {
17 registry: Registry,
18
19 pub jobs_submitted: Counter,
21 pub jobs_running: Gauge,
22 pub jobs_completed: CounterVec,
23
24 pub scale_events: CounterVec,
26 pub current_instances: GaugeVec,
27
28 pub route_requests: CounterVec,
30 pub route_latency: HistogramVec,
31
32 pub cpu_utilization: GaugeVec,
34 pub memory_utilization: GaugeVec,
35
36 pub requests_total: CounterVec,
38 pub request_duration: HistogramVec,
39 pub active_connections: Gauge,
40}
41
42impl ForgeMetrics {
43 pub fn new() -> Result<Self> {
45 let registry = Registry::new();
46
47 let jobs_submitted = Counter::new("forge_jobs_submitted_total", "Total jobs submitted")?;
49 let jobs_running = Gauge::new("forge_jobs_running", "Currently running jobs")?;
50 let jobs_completed = CounterVec::new(
51 Opts::new("forge_jobs_completed_total", "Total jobs completed"),
52 &["status"],
53 )?;
54
55 let scale_events = CounterVec::new(
57 Opts::new("forge_scale_events_total", "Total scaling events"),
58 &["job", "direction"],
59 )?;
60 let current_instances = GaugeVec::new(
61 Opts::new("forge_instances", "Current instance count"),
62 &["job", "group"],
63 )?;
64
65 let route_requests = CounterVec::new(
67 Opts::new("forge_route_requests_total", "Total routing requests"),
68 &["router", "expert"],
69 )?;
70 let route_latency = HistogramVec::new(
71 HistogramOpts::new("forge_route_latency_seconds", "Routing latency")
72 .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
73 &["router"],
74 )?;
75
76 let cpu_utilization = GaugeVec::new(
78 Opts::new("forge_cpu_utilization", "CPU utilization (0-1)"),
79 &["job", "group"],
80 )?;
81 let memory_utilization = GaugeVec::new(
82 Opts::new("forge_memory_utilization", "Memory utilization (0-1)"),
83 &["job", "group"],
84 )?;
85
86 let requests_total = CounterVec::new(
88 Opts::new("forge_http_requests_total", "Total HTTP requests"),
89 &["method", "path", "status"],
90 )?;
91 let request_duration = HistogramVec::new(
92 HistogramOpts::new("forge_http_request_duration_seconds", "HTTP request duration")
93 .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
94 &["method", "path"],
95 )?;
96 let active_connections = Gauge::new("forge_active_connections", "Active connections")?;
97
98 registry.register(Box::new(jobs_submitted.clone()))?;
100 registry.register(Box::new(jobs_running.clone()))?;
101 registry.register(Box::new(jobs_completed.clone()))?;
102 registry.register(Box::new(scale_events.clone()))?;
103 registry.register(Box::new(current_instances.clone()))?;
104 registry.register(Box::new(route_requests.clone()))?;
105 registry.register(Box::new(route_latency.clone()))?;
106 registry.register(Box::new(cpu_utilization.clone()))?;
107 registry.register(Box::new(memory_utilization.clone()))?;
108 registry.register(Box::new(requests_total.clone()))?;
109 registry.register(Box::new(request_duration.clone()))?;
110 registry.register(Box::new(active_connections.clone()))?;
111
112 Ok(Self {
113 registry,
114 jobs_submitted,
115 jobs_running,
116 jobs_completed,
117 scale_events,
118 current_instances,
119 route_requests,
120 route_latency,
121 cpu_utilization,
122 memory_utilization,
123 requests_total,
124 request_duration,
125 active_connections,
126 })
127 }
128
129 pub fn registry(&self) -> &Registry {
131 &self.registry
132 }
133
134 pub fn record_job_submitted(&self) {
136 self.jobs_submitted.inc();
137 self.jobs_running.inc();
138 }
139
140 pub fn record_job_completed(&self, success: bool) {
142 self.jobs_running.dec();
143 let status = if success { "success" } else { "failed" };
144 self.jobs_completed.with_label_values(&[status]).inc();
145 }
146
147 pub fn record_scale_event(&self, job: &str, direction: &str) {
149 self.scale_events.with_label_values(&[job, direction]).inc();
150 }
151
152 pub fn set_instances(&self, job: &str, group: &str, count: f64) {
154 self.current_instances
155 .with_label_values(&[job, group])
156 .set(count);
157 }
158
159 pub fn record_route(&self, router: &str, expert: usize, latency_secs: f64) {
161 let expert_str = expert.to_string();
162 self.route_requests
163 .with_label_values(&[router, &expert_str])
164 .inc();
165 self.route_latency
166 .with_label_values(&[router])
167 .observe(latency_secs);
168 }
169
170 pub fn set_utilization(&self, job: &str, group: &str, cpu: f64, memory: f64) {
172 self.cpu_utilization
173 .with_label_values(&[job, group])
174 .set(cpu);
175 self.memory_utilization
176 .with_label_values(&[job, group])
177 .set(memory);
178 }
179
180 pub fn record_http_request(&self, method: &str, path: &str, status: u16, duration_secs: f64) {
182 let status_str = status.to_string();
183 self.requests_total
184 .with_label_values(&[method, path, &status_str])
185 .inc();
186 self.request_duration
187 .with_label_values(&[method, path])
188 .observe(duration_secs);
189 }
190
191 pub fn gather_text(&self) -> Result<String> {
193 use prometheus::Encoder;
194 let encoder = prometheus::TextEncoder::new();
195 let metric_families = self.registry.gather();
196 let mut buffer = Vec::new();
197 encoder
198 .encode(&metric_families, &mut buffer)
199 .map_err(|e| ForgeError::metrics(format!("Encode error: {}", e)))?;
200 String::from_utf8(buffer).map_err(|e| ForgeError::metrics(format!("UTF8 error: {}", e)))
201 }
202}
203
204impl Default for ForgeMetrics {
205 fn default() -> Self {
206 match Self::new() {
207 Ok(m) => m,
208 Err(e) => {
209 tracing::error!(error = %e, "Failed to create metrics, using stub");
210 panic!("ForgeMetrics::default() failed: {}", e);
211 }
212 }
213 }
214}
215
216pub trait MetricsHook: Send + Sync {
218 fn collect(&self, metrics: &ForgeMetrics);
220
221 fn name(&self) -> &str;
223}
224
225pub struct MetricsExporter {
227 metrics: Arc<ForgeMetrics>,
228 hooks: Vec<Box<dyn MetricsHook>>,
229}
230
231impl MetricsExporter {
232 pub fn new(metrics: Arc<ForgeMetrics>) -> Self {
234 Self {
235 metrics,
236 hooks: Vec::new(),
237 }
238 }
239
240 pub fn register_hook(&mut self, hook: Box<dyn MetricsHook>) {
242 info!(hook = %hook.name(), "Registered metrics hook");
243 self.hooks.push(hook);
244 }
245
246 pub fn collect(&self) {
248 for hook in &self.hooks {
249 hook.collect(&self.metrics);
250 }
251 }
252
253 pub fn export(&self) -> Result<String> {
255 self.collect();
256 self.metrics.gather_text()
257 }
258
259 pub fn handler(
261 metrics: Arc<ForgeMetrics>,
262 ) -> impl Fn() -> std::pin::Pin<
263 Box<dyn std::future::Future<Output = axum::response::Response> + Send>,
264 > + Clone {
265 move || {
266 let metrics = Arc::clone(&metrics);
267 Box::pin(async move {
268 match metrics.gather_text() {
269 Ok(text) => axum::response::Response::builder()
270 .header("Content-Type", "text/plain; charset=utf-8")
271 .body(axum::body::Body::from(text))
272 .unwrap_or_else(|_| {
273 axum::response::Response::new(axum::body::Body::from("Internal error"))
274 }),
275 Err(e) => axum::response::Response::builder()
276 .status(500)
277 .body(axum::body::Body::from(format!("Error: {}", e)))
278 .unwrap_or_else(|_| {
279 axum::response::Response::new(axum::body::Body::from("Internal error"))
280 }),
281 }
282 })
283 }
284 }
285}
286
287pub struct Timer {
289 start: std::time::Instant,
290}
291
292impl Timer {
293 pub fn start() -> Self {
295 Self {
296 start: std::time::Instant::now(),
297 }
298 }
299
300 pub fn elapsed_secs(&self) -> f64 {
302 self.start.elapsed().as_secs_f64()
303 }
304
305 pub fn stop(self) -> f64 {
307 self.elapsed_secs()
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn test_metrics_creation() {
317 let metrics = ForgeMetrics::new().unwrap();
318 assert!(metrics.gather_text().is_ok());
319 }
320
321 #[test]
322 fn test_job_metrics() {
323 let metrics = ForgeMetrics::new().unwrap();
324
325 metrics.record_job_submitted();
326 metrics.record_job_submitted();
327 metrics.record_job_completed(true);
328
329 let text = metrics.gather_text().unwrap();
330 assert!(text.contains("forge_jobs_submitted_total 2"));
331 assert!(text.contains("forge_jobs_running 1"));
332 }
333
334 #[test]
335 fn test_scale_metrics() {
336 let metrics = ForgeMetrics::new().unwrap();
337
338 metrics.record_scale_event("my-job", "up");
339 metrics.record_scale_event("my-job", "up");
340 metrics.record_scale_event("my-job", "down");
341
342 let text = metrics.gather_text().unwrap();
343 assert!(text.contains("forge_scale_events_total"));
344 }
345
346 #[test]
347 fn test_timer() {
348 let timer = Timer::start();
349 std::thread::sleep(std::time::Duration::from_millis(10));
350 let elapsed = timer.stop();
351 assert!(elapsed >= 0.01);
352 }
353}