forge_orchestration/
metrics.rs

1//! Metrics and monitoring for Forge
2//!
3//! ## Table of Contents
4//! - **MetricsExporter**: Prometheus metrics exporter (requires `metrics` feature)
5//! - **MetricsHook**: Custom metrics callback trait
6//! - **MetricsRegistry**: Central metrics registry
7
8use crate::error::{ForgeError, Result};
9use prometheus::{
10    Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, Registry,
11};
12use std::sync::Arc;
13use tracing::info;
14
15/// Core metrics for Forge
16pub struct ForgeMetrics {
17    registry: Registry,
18
19    // Job metrics
20    pub jobs_submitted: Counter,
21    pub jobs_running: Gauge,
22    pub jobs_completed: CounterVec,
23
24    // Scaling metrics
25    pub scale_events: CounterVec,
26    pub current_instances: GaugeVec,
27
28    // Routing metrics
29    pub route_requests: CounterVec,
30    pub route_latency: HistogramVec,
31
32    // Resource metrics
33    pub cpu_utilization: GaugeVec,
34    pub memory_utilization: GaugeVec,
35
36    // Network metrics
37    pub requests_total: CounterVec,
38    pub request_duration: HistogramVec,
39    pub active_connections: Gauge,
40}
41
42impl ForgeMetrics {
43    /// Create a new metrics instance
44    pub fn new() -> Result<Self> {
45        let registry = Registry::new();
46
47        // Job metrics
48        let jobs_submitted = Counter::new("forge_jobs_submitted_total", "Total jobs submitted")?;
49        let jobs_running = Gauge::new("forge_jobs_running", "Currently running jobs")?;
50        let jobs_completed = CounterVec::new(
51            Opts::new("forge_jobs_completed_total", "Total jobs completed"),
52            &["status"],
53        )?;
54
55        // Scaling metrics
56        let scale_events = CounterVec::new(
57            Opts::new("forge_scale_events_total", "Total scaling events"),
58            &["job", "direction"],
59        )?;
60        let current_instances = GaugeVec::new(
61            Opts::new("forge_instances", "Current instance count"),
62            &["job", "group"],
63        )?;
64
65        // Routing metrics
66        let route_requests = CounterVec::new(
67            Opts::new("forge_route_requests_total", "Total routing requests"),
68            &["router", "expert"],
69        )?;
70        let route_latency = HistogramVec::new(
71            HistogramOpts::new("forge_route_latency_seconds", "Routing latency")
72                .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
73            &["router"],
74        )?;
75
76        // Resource metrics
77        let cpu_utilization = GaugeVec::new(
78            Opts::new("forge_cpu_utilization", "CPU utilization (0-1)"),
79            &["job", "group"],
80        )?;
81        let memory_utilization = GaugeVec::new(
82            Opts::new("forge_memory_utilization", "Memory utilization (0-1)"),
83            &["job", "group"],
84        )?;
85
86        // Network metrics
87        let requests_total = CounterVec::new(
88            Opts::new("forge_http_requests_total", "Total HTTP requests"),
89            &["method", "path", "status"],
90        )?;
91        let request_duration = HistogramVec::new(
92            HistogramOpts::new("forge_http_request_duration_seconds", "HTTP request duration")
93                .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]),
94            &["method", "path"],
95        )?;
96        let active_connections = Gauge::new("forge_active_connections", "Active connections")?;
97
98        // Register all metrics
99        registry.register(Box::new(jobs_submitted.clone()))?;
100        registry.register(Box::new(jobs_running.clone()))?;
101        registry.register(Box::new(jobs_completed.clone()))?;
102        registry.register(Box::new(scale_events.clone()))?;
103        registry.register(Box::new(current_instances.clone()))?;
104        registry.register(Box::new(route_requests.clone()))?;
105        registry.register(Box::new(route_latency.clone()))?;
106        registry.register(Box::new(cpu_utilization.clone()))?;
107        registry.register(Box::new(memory_utilization.clone()))?;
108        registry.register(Box::new(requests_total.clone()))?;
109        registry.register(Box::new(request_duration.clone()))?;
110        registry.register(Box::new(active_connections.clone()))?;
111
112        Ok(Self {
113            registry,
114            jobs_submitted,
115            jobs_running,
116            jobs_completed,
117            scale_events,
118            current_instances,
119            route_requests,
120            route_latency,
121            cpu_utilization,
122            memory_utilization,
123            requests_total,
124            request_duration,
125            active_connections,
126        })
127    }
128
129    /// Get the Prometheus registry
130    pub fn registry(&self) -> &Registry {
131        &self.registry
132    }
133
134    /// Record a job submission
135    pub fn record_job_submitted(&self) {
136        self.jobs_submitted.inc();
137        self.jobs_running.inc();
138    }
139
140    /// Record a job completion
141    pub fn record_job_completed(&self, success: bool) {
142        self.jobs_running.dec();
143        let status = if success { "success" } else { "failed" };
144        self.jobs_completed.with_label_values(&[status]).inc();
145    }
146
147    /// Record a scaling event
148    pub fn record_scale_event(&self, job: &str, direction: &str) {
149        self.scale_events.with_label_values(&[job, direction]).inc();
150    }
151
152    /// Update instance count
153    pub fn set_instances(&self, job: &str, group: &str, count: f64) {
154        self.current_instances
155            .with_label_values(&[job, group])
156            .set(count);
157    }
158
159    /// Record a routing request
160    pub fn record_route(&self, router: &str, expert: usize, latency_secs: f64) {
161        let expert_str = expert.to_string();
162        self.route_requests
163            .with_label_values(&[router, &expert_str])
164            .inc();
165        self.route_latency
166            .with_label_values(&[router])
167            .observe(latency_secs);
168    }
169
170    /// Update resource utilization
171    pub fn set_utilization(&self, job: &str, group: &str, cpu: f64, memory: f64) {
172        self.cpu_utilization
173            .with_label_values(&[job, group])
174            .set(cpu);
175        self.memory_utilization
176            .with_label_values(&[job, group])
177            .set(memory);
178    }
179
180    /// Record an HTTP request
181    pub fn record_http_request(&self, method: &str, path: &str, status: u16, duration_secs: f64) {
182        let status_str = status.to_string();
183        self.requests_total
184            .with_label_values(&[method, path, &status_str])
185            .inc();
186        self.request_duration
187            .with_label_values(&[method, path])
188            .observe(duration_secs);
189    }
190
191    /// Gather all metrics as text
192    pub fn gather_text(&self) -> Result<String> {
193        use prometheus::Encoder;
194        let encoder = prometheus::TextEncoder::new();
195        let metric_families = self.registry.gather();
196        let mut buffer = Vec::new();
197        encoder
198            .encode(&metric_families, &mut buffer)
199            .map_err(|e| ForgeError::metrics(format!("Encode error: {}", e)))?;
200        String::from_utf8(buffer).map_err(|e| ForgeError::metrics(format!("UTF8 error: {}", e)))
201    }
202}
203
204impl Default for ForgeMetrics {
205    fn default() -> Self {
206        match Self::new() {
207            Ok(m) => m,
208            Err(e) => {
209                tracing::error!(error = %e, "Failed to create metrics, using stub");
210                panic!("ForgeMetrics::default() failed: {}", e);
211            }
212        }
213    }
214}
215
216/// Trait for custom metrics hooks
217pub trait MetricsHook: Send + Sync {
218    /// Called periodically to collect custom metrics
219    fn collect(&self, metrics: &ForgeMetrics);
220
221    /// Hook name for identification
222    fn name(&self) -> &str;
223}
224
225/// Metrics exporter for Prometheus scraping
226pub struct MetricsExporter {
227    metrics: Arc<ForgeMetrics>,
228    hooks: Vec<Box<dyn MetricsHook>>,
229}
230
231impl MetricsExporter {
232    /// Create a new exporter
233    pub fn new(metrics: Arc<ForgeMetrics>) -> Self {
234        Self {
235            metrics,
236            hooks: Vec::new(),
237        }
238    }
239
240    /// Register a custom metrics hook
241    pub fn register_hook(&mut self, hook: Box<dyn MetricsHook>) {
242        info!(hook = %hook.name(), "Registered metrics hook");
243        self.hooks.push(hook);
244    }
245
246    /// Collect all metrics
247    pub fn collect(&self) {
248        for hook in &self.hooks {
249            hook.collect(&self.metrics);
250        }
251    }
252
253    /// Get metrics as Prometheus text format
254    pub fn export(&self) -> Result<String> {
255        self.collect();
256        self.metrics.gather_text()
257    }
258
259    /// Create an axum handler for /metrics endpoint
260    pub fn handler(
261        metrics: Arc<ForgeMetrics>,
262    ) -> impl Fn() -> std::pin::Pin<
263        Box<dyn std::future::Future<Output = axum::response::Response> + Send>,
264    > + Clone {
265        move || {
266            let metrics = Arc::clone(&metrics);
267            Box::pin(async move {
268                match metrics.gather_text() {
269                    Ok(text) => axum::response::Response::builder()
270                        .header("Content-Type", "text/plain; charset=utf-8")
271                        .body(axum::body::Body::from(text))
272                        .unwrap_or_else(|_| {
273                            axum::response::Response::new(axum::body::Body::from("Internal error"))
274                        }),
275                    Err(e) => axum::response::Response::builder()
276                        .status(500)
277                        .body(axum::body::Body::from(format!("Error: {}", e)))
278                        .unwrap_or_else(|_| {
279                            axum::response::Response::new(axum::body::Body::from("Internal error"))
280                        }),
281                }
282            })
283        }
284    }
285}
286
287/// Timer for measuring operation duration
288pub struct Timer {
289    start: std::time::Instant,
290}
291
292impl Timer {
293    /// Start a new timer
294    pub fn start() -> Self {
295        Self {
296            start: std::time::Instant::now(),
297        }
298    }
299
300    /// Get elapsed time in seconds
301    pub fn elapsed_secs(&self) -> f64 {
302        self.start.elapsed().as_secs_f64()
303    }
304
305    /// Stop and return elapsed seconds
306    pub fn stop(self) -> f64 {
307        self.elapsed_secs()
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_metrics_creation() {
317        let metrics = ForgeMetrics::new().unwrap();
318        assert!(metrics.gather_text().is_ok());
319    }
320
321    #[test]
322    fn test_job_metrics() {
323        let metrics = ForgeMetrics::new().unwrap();
324
325        metrics.record_job_submitted();
326        metrics.record_job_submitted();
327        metrics.record_job_completed(true);
328
329        let text = metrics.gather_text().unwrap();
330        assert!(text.contains("forge_jobs_submitted_total 2"));
331        assert!(text.contains("forge_jobs_running 1"));
332    }
333
334    #[test]
335    fn test_scale_metrics() {
336        let metrics = ForgeMetrics::new().unwrap();
337
338        metrics.record_scale_event("my-job", "up");
339        metrics.record_scale_event("my-job", "up");
340        metrics.record_scale_event("my-job", "down");
341
342        let text = metrics.gather_text().unwrap();
343        assert!(text.contains("forge_scale_events_total"));
344    }
345
346    #[test]
347    fn test_timer() {
348        let timer = Timer::start();
349        std::thread::sleep(std::time::Duration::from_millis(10));
350        let elapsed = timer.stop();
351        assert!(elapsed >= 0.01);
352    }
353}