Skip to main content

jflow_core/
metrics.rs

1//! Unified Prometheus metrics for all JANUS modules
2
3use prometheus::{
4    Counter, CounterVec, Gauge, GaugeVec, HistogramVec, register_counter, register_counter_vec,
5    register_gauge, register_gauge_vec, register_histogram_vec,
6};
7use std::sync::OnceLock;
8
9/// Global metrics instance
10static METRICS: OnceLock<JanusMetrics> = OnceLock::new();
11
12/// Get or initialize the global metrics
13pub fn metrics() -> &'static JanusMetrics {
14    METRICS.get_or_init(JanusMetrics::new)
15}
16
17/// Unified metrics for all JANUS modules
18pub struct JanusMetrics {
19    // =========================================================================
20    // Signal Metrics
21    // =========================================================================
22    /// Total signals generated
23    pub signals_generated_total: CounterVec,
24    /// Signal generation latency
25    pub signal_generation_duration: HistogramVec,
26    /// Current signal confidence
27    pub signal_confidence: GaugeVec,
28
29    // =========================================================================
30    // Module Metrics
31    // =========================================================================
32    /// Module health (1 = healthy, 0 = unhealthy)
33    pub module_health: GaugeVec,
34    /// Module uptime in seconds
35    pub module_uptime_seconds: GaugeVec,
36
37    // =========================================================================
38    // API Metrics
39    // =========================================================================
40    /// HTTP requests total
41    pub http_requests_total: CounterVec,
42    /// HTTP request duration
43    pub http_request_duration: HistogramVec,
44    /// HTTP requests in flight
45    pub http_requests_in_flight: Gauge,
46
47    // =========================================================================
48    // Database Metrics
49    // =========================================================================
50    /// Database query count
51    pub db_queries_total: CounterVec,
52    /// Database query duration
53    pub db_query_duration: HistogramVec,
54    /// Database connection pool size
55    pub db_pool_size: Gauge,
56
57    // =========================================================================
58    // Redis Metrics
59    // =========================================================================
60    /// Redis operations total
61    pub redis_operations_total: CounterVec,
62    /// Redis operation duration
63    pub redis_operation_duration: HistogramVec,
64    /// Redis connection status
65    pub redis_connected: Gauge,
66
67    // =========================================================================
68    // WebSocket Metrics
69    // =========================================================================
70    /// Active WebSocket connections
71    pub websocket_connections: Gauge,
72    /// WebSocket messages sent
73    pub websocket_messages_sent: CounterVec,
74
75    // =========================================================================
76    // Supervisor Metrics
77    // =========================================================================
78    /// Total number of service restarts across all supervised services.
79    /// Maps to: `janus_supervisor_restarts_total`
80    pub supervisor_restarts_total: Counter,
81    /// Number of services currently in a non-terminal phase.
82    /// Maps to: `janus_supervisor_active_services`
83    pub supervisor_active_services: Gauge,
84    /// Total number of services ever spawned (including initial + restarts).
85    /// Maps to: `janus_supervisor_spawned_total`
86    pub supervisor_spawned_total: Counter,
87    /// Total number of services that have terminated.
88    /// Maps to: `janus_supervisor_terminated_total`
89    pub supervisor_terminated_total: Counter,
90    /// Total number of circuit breaker trips.
91    /// Maps to: `janus_supervisor_circuit_breaker_trips_total`
92    pub supervisor_circuit_breaker_trips: Counter,
93    /// Per-service uptime histogram (seconds).
94    /// Maps to: `janus_supervisor_uptime_seconds`
95    pub supervisor_uptime_seconds: HistogramVec,
96}
97
98impl JanusMetrics {
99    /// Create new metrics instance
100    pub fn new() -> Self {
101        Self {
102            // Signal metrics
103            signals_generated_total: register_counter_vec!(
104                "janus_signals_generated_total",
105                "Total number of signals generated",
106                &["module", "signal_type", "symbol"]
107            )
108            .unwrap(),
109
110            signal_generation_duration: register_histogram_vec!(
111                "janus_signal_generation_duration_seconds",
112                "Time spent generating signals",
113                &["module"]
114            )
115            .unwrap(),
116
117            signal_confidence: register_gauge_vec!(
118                "janus_signal_confidence",
119                "Confidence of the latest signal",
120                &["symbol", "signal_type"]
121            )
122            .unwrap(),
123
124            // Module metrics
125            module_health: register_gauge_vec!(
126                "janus_module_health",
127                "Module health status (1=healthy, 0=unhealthy)",
128                &["module"]
129            )
130            .unwrap(),
131
132            module_uptime_seconds: register_gauge_vec!(
133                "janus_module_uptime_seconds",
134                "Module uptime in seconds",
135                &["module"]
136            )
137            .unwrap(),
138
139            // API metrics
140            http_requests_total: register_counter_vec!(
141                "janus_http_requests_total",
142                "Total HTTP requests",
143                &["method", "path", "status"]
144            )
145            .unwrap(),
146
147            http_request_duration: register_histogram_vec!(
148                "janus_http_request_duration_seconds",
149                "HTTP request duration",
150                &["method", "path"]
151            )
152            .unwrap(),
153
154            http_requests_in_flight: register_gauge!(
155                "janus_http_requests_in_flight",
156                "Number of HTTP requests currently being processed"
157            )
158            .unwrap(),
159
160            // Database metrics
161            db_queries_total: register_counter_vec!(
162                "janus_db_queries_total",
163                "Total database queries",
164                &["operation", "table"]
165            )
166            .unwrap(),
167
168            db_query_duration: register_histogram_vec!(
169                "janus_db_query_duration_seconds",
170                "Database query duration",
171                &["operation"]
172            )
173            .unwrap(),
174
175            db_pool_size: register_gauge!("janus_db_pool_size", "Database connection pool size")
176                .unwrap(),
177
178            // Redis metrics
179            redis_operations_total: register_counter_vec!(
180                "janus_redis_operations_total",
181                "Total Redis operations",
182                &["operation"]
183            )
184            .unwrap(),
185
186            redis_operation_duration: register_histogram_vec!(
187                "janus_redis_operation_duration_seconds",
188                "Redis operation duration",
189                &["operation"]
190            )
191            .unwrap(),
192
193            redis_connected: register_gauge!(
194                "janus_redis_connected",
195                "Redis connection status (1=connected, 0=disconnected)"
196            )
197            .unwrap(),
198
199            // WebSocket metrics
200            websocket_connections: register_gauge!(
201                "janus_websocket_connections",
202                "Number of active WebSocket connections"
203            )
204            .unwrap(),
205
206            websocket_messages_sent: register_counter_vec!(
207                "janus_websocket_messages_sent_total",
208                "Total WebSocket messages sent",
209                &["message_type"]
210            )
211            .unwrap(),
212
213            // Supervisor metrics
214            supervisor_restarts_total: register_counter!(
215                "janus_supervisor_restarts_total",
216                "Total number of service restarts across all supervised services"
217            )
218            .unwrap(),
219
220            supervisor_active_services: register_gauge!(
221                "janus_supervisor_active_services",
222                "Number of services currently in a non-terminal phase"
223            )
224            .unwrap(),
225
226            supervisor_spawned_total: register_counter!(
227                "janus_supervisor_spawned_total",
228                "Total number of services ever spawned"
229            )
230            .unwrap(),
231
232            supervisor_terminated_total: register_counter!(
233                "janus_supervisor_terminated_total",
234                "Total number of services that have terminated"
235            )
236            .unwrap(),
237
238            supervisor_circuit_breaker_trips: register_counter!(
239                "janus_supervisor_circuit_breaker_trips_total",
240                "Total number of circuit breaker trips"
241            )
242            .unwrap(),
243
244            supervisor_uptime_seconds: register_histogram_vec!(
245                "janus_supervisor_uptime_seconds",
246                "Per-service cumulative uptime in seconds",
247                &["service"]
248            )
249            .unwrap(),
250        }
251    }
252
253    /// Record a signal generation
254    pub fn record_signal(&self, module: &str, signal_type: &str, symbol: &str, confidence: f64) {
255        self.signals_generated_total
256            .with_label_values(&[module, signal_type, symbol])
257            .inc();
258        self.signal_confidence
259            .with_label_values(&[symbol, signal_type])
260            .set(confidence);
261    }
262
263    /// Record module health
264    pub fn record_module_health(&self, module: &str, healthy: bool, uptime_seconds: f64) {
265        self.module_health
266            .with_label_values(&[module])
267            .set(if healthy { 1.0 } else { 0.0 });
268        self.module_uptime_seconds
269            .with_label_values(&[module])
270            .set(uptime_seconds);
271    }
272
273    /// Record HTTP request
274    pub fn record_http_request(&self, method: &str, path: &str, status: u16, duration_secs: f64) {
275        self.http_requests_total
276            .with_label_values(&[method, path, &status.to_string()])
277            .inc();
278        self.http_request_duration
279            .with_label_values(&[method, path])
280            .observe(duration_secs);
281    }
282
283    // =========================================================================
284    // Supervisor metric helpers
285    //
286    // NOTE: The `supervisor_active_services` **gauge** is managed
287    // authoritatively by `SupervisorMetrics` (in `supervisor/mod.rs`)
288    // which uses atomic integers as the single source of truth and
289    // calls `supervisor_active_services.set(value)` directly.
290    //
291    // The helpers below intentionally do NOT touch the gauge so that
292    // there is exactly one code path responsible for it, eliminating
293    // the TOCTOU race that existed when both `SupervisorMetrics` and
294    // these helpers independently incremented / decremented the gauge.
295    // =========================================================================
296
297    /// Record a service spawn event (counter only — the `active_services`
298    /// gauge is managed by `SupervisorMetrics`).
299    pub fn record_supervisor_spawn(&self) {
300        self.supervisor_spawned_total.inc();
301    }
302
303    /// Record a service restart event.
304    pub fn record_supervisor_restart(&self) {
305        self.supervisor_restarts_total.inc();
306    }
307
308    /// Record a service termination event (counter only — the
309    /// `active_services` gauge is managed by `SupervisorMetrics`).
310    pub fn record_supervisor_termination(&self) {
311        self.supervisor_terminated_total.inc();
312    }
313
314    /// Record a circuit breaker trip event.
315    pub fn record_supervisor_circuit_breaker_trip(&self) {
316        self.supervisor_circuit_breaker_trips.inc();
317    }
318
319    /// Record a service's cumulative uptime when it terminates.
320    pub fn record_supervisor_service_uptime(&self, service: &str, uptime_secs: f64) {
321        self.supervisor_uptime_seconds
322            .with_label_values(&[service])
323            .observe(uptime_secs);
324    }
325
326    /// Get all metrics as text (for /metrics endpoint)
327    pub fn encode(&self) -> String {
328        use prometheus::Encoder;
329        let encoder = prometheus::TextEncoder::new();
330        let metric_families = prometheus::gather();
331        let mut buffer = Vec::new();
332        encoder.encode(&metric_families, &mut buffer).unwrap();
333        String::from_utf8(buffer).unwrap()
334    }
335}
336
337impl Default for JanusMetrics {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    /// All tests must use the global `metrics()` singleton because
348    /// Prometheus metric names are globally registered — calling
349    /// `JanusMetrics::new()` more than once panics on duplicate
350    /// registration.  Since tests run in the same process (and the
351    /// supervisor code also calls `metrics()`), every test must go
352    /// through the singleton.
353
354    #[test]
355    fn test_metrics_creation() {
356        let m = metrics();
357        // Just verify the singleton is usable and helpers don't panic.
358        m.record_signal("forward", "buy", "BTCUSD", 0.85);
359        m.record_module_health("forward", true, 100.0);
360    }
361
362    #[test]
363    fn test_supervisor_metrics_helpers() {
364        let m = metrics();
365
366        // Because tests run in parallel and share the global Prometheus
367        // registry, we cannot assert exact counter deltas (other
368        // supervisor tests bump the same counters concurrently).
369        // Instead we verify that:
370        //   1. The helper methods don't panic
371        //   2. Monotonic counters only increase
372
373        let spawned_before = m.supervisor_spawned_total.get();
374        m.record_supervisor_spawn();
375        assert!(m.supervisor_spawned_total.get() > spawned_before);
376
377        let restarts_before = m.supervisor_restarts_total.get();
378        m.record_supervisor_restart();
379        assert!(m.supervisor_restarts_total.get() > restarts_before);
380
381        let terminated_before = m.supervisor_terminated_total.get();
382        m.record_supervisor_termination();
383        assert!(m.supervisor_terminated_total.get() > terminated_before);
384
385        m.record_supervisor_service_uptime("test-svc", 42.5);
386
387        let trips_before = m.supervisor_circuit_breaker_trips.get();
388        m.record_supervisor_circuit_breaker_trip();
389        assert!(m.supervisor_circuit_breaker_trips.get() > trips_before);
390    }
391
392    #[test]
393    fn test_supervisor_active_services_gauge_is_settable() {
394        let m = metrics();
395
396        // The `active_services` gauge is now managed exclusively by
397        // `SupervisorMetrics` via `set()`.  Verify that the gauge
398        // can be set to an arbitrary value and read back correctly.
399        m.supervisor_active_services.set(5.0);
400        assert_eq!(m.supervisor_active_services.get(), 5.0);
401
402        m.supervisor_active_services.set(0.0);
403        assert_eq!(m.supervisor_active_services.get(), 0.0);
404
405        // `record_supervisor_spawn` and `record_supervisor_termination`
406        // no longer touch the gauge — verify that.
407        m.supervisor_active_services.set(3.0);
408        m.record_supervisor_spawn();
409        assert_eq!(
410            m.supervisor_active_services.get(),
411            3.0,
412            "record_supervisor_spawn should not modify the gauge"
413        );
414        m.record_supervisor_termination();
415        assert_eq!(
416            m.supervisor_active_services.get(),
417            3.0,
418            "record_supervisor_termination should not modify the gauge"
419        );
420
421        // Clean up: reset gauge so other tests aren't affected.
422        m.supervisor_active_services.set(0.0);
423    }
424}