ipfrs_interface/
metrics.rs

1//! Prometheus metrics for observability
2//!
3//! This module provides comprehensive metrics collection for monitoring
4//! IPFRS interface performance, usage patterns, and system health.
5
6use lazy_static::lazy_static;
7use prometheus::{
8    register_counter_vec, register_gauge_vec, register_histogram_vec, register_int_counter_vec,
9    register_int_gauge_vec, CounterVec, Encoder, GaugeVec, HistogramVec, IntCounterVec,
10    IntGaugeVec, TextEncoder,
11};
12use std::time::Instant;
13
14lazy_static! {
15    // HTTP Request Metrics
16
17    /// Total number of HTTP requests by endpoint and method
18    pub static ref HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
19        "ipfrs_http_requests_total",
20        "Total number of HTTP requests",
21        &["endpoint", "method", "status"]
22    )
23    .unwrap();
24
25    /// HTTP request duration in seconds
26    pub static ref HTTP_REQUEST_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
27        "ipfrs_http_request_duration_seconds",
28        "HTTP request duration in seconds",
29        &["endpoint", "method"],
30        vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
31    )
32    .unwrap();
33
34    /// HTTP request body size in bytes
35    pub static ref HTTP_REQUEST_SIZE_BYTES: HistogramVec = register_histogram_vec!(
36        "ipfrs_http_request_size_bytes",
37        "HTTP request body size in bytes",
38        &["endpoint", "method"],
39        vec![
40            100.0,
41            1_000.0,
42            10_000.0,
43            100_000.0,
44            1_000_000.0,
45            10_000_000.0,
46            100_000_000.0
47        ]
48    )
49    .unwrap();
50
51    /// HTTP response size in bytes
52    pub static ref HTTP_RESPONSE_SIZE_BYTES: HistogramVec = register_histogram_vec!(
53        "ipfrs_http_response_size_bytes",
54        "HTTP response body size in bytes",
55        &["endpoint", "method"],
56        vec![
57            100.0,
58            1_000.0,
59            10_000.0,
60            100_000.0,
61            1_000_000.0,
62            10_000_000.0,
63            100_000_000.0
64        ]
65    )
66    .unwrap();
67
68    /// Currently active HTTP connections
69    pub static ref HTTP_CONNECTIONS_ACTIVE: IntGaugeVec = register_int_gauge_vec!(
70        "ipfrs_http_connections_active",
71        "Currently active HTTP connections",
72        &["endpoint"]
73    )
74    .unwrap();
75
76    // Block Operations Metrics
77
78    /// Total blocks retrieved
79    pub static ref BLOCKS_RETRIEVED_TOTAL: IntCounterVec = register_int_counter_vec!(
80        "ipfrs_blocks_retrieved_total",
81        "Total number of blocks retrieved",
82        &["source"]
83    )
84    .unwrap();
85
86    /// Total blocks stored
87    pub static ref BLOCKS_STORED_TOTAL: IntCounterVec = register_int_counter_vec!(
88        "ipfrs_blocks_stored_total",
89        "Total number of blocks stored",
90        &["destination"]
91    )
92    .unwrap();
93
94    /// Block operation errors
95    pub static ref BLOCK_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec!(
96        "ipfrs_block_errors_total",
97        "Total number of block operation errors",
98        &["operation", "error_type"]
99    )
100    .unwrap();
101
102    /// Block retrieval duration
103    pub static ref BLOCK_RETRIEVAL_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
104        "ipfrs_block_retrieval_duration_seconds",
105        "Block retrieval duration in seconds",
106        &["source"],
107        vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
108    )
109    .unwrap();
110
111    // Batch Operations Metrics
112
113    /// Batch operation size (number of items)
114    pub static ref BATCH_OPERATION_SIZE: HistogramVec = register_histogram_vec!(
115        "ipfrs_batch_operation_size",
116        "Number of items in batch operations",
117        &["operation"],
118        vec![1.0, 10.0, 50.0, 100.0, 500.0, 1000.0]
119    )
120    .unwrap();
121
122    /// Batch operation duration
123    pub static ref BATCH_OPERATION_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
124        "ipfrs_batch_operation_duration_seconds",
125        "Batch operation duration in seconds",
126        &["operation"],
127        vec![0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0]
128    )
129    .unwrap();
130
131    // Streaming Metrics
132
133    /// Total bytes uploaded
134    pub static ref UPLOAD_BYTES_TOTAL: CounterVec = register_counter_vec!(
135        "ipfrs_upload_bytes_total",
136        "Total bytes uploaded",
137        &["endpoint"]
138    )
139    .unwrap();
140
141    /// Total bytes downloaded
142    pub static ref DOWNLOAD_BYTES_TOTAL: CounterVec = register_counter_vec!(
143        "ipfrs_download_bytes_total",
144        "Total bytes downloaded",
145        &["endpoint"]
146    )
147    .unwrap();
148
149    /// Active streaming operations
150    pub static ref STREAMING_OPERATIONS_ACTIVE: IntGaugeVec = register_int_gauge_vec!(
151        "ipfrs_streaming_operations_active",
152        "Currently active streaming operations",
153        &["type"]
154    )
155    .unwrap();
156
157    /// Streaming chunk size
158    pub static ref STREAMING_CHUNK_SIZE_BYTES: HistogramVec = register_histogram_vec!(
159        "ipfrs_streaming_chunk_size_bytes",
160        "Streaming chunk size in bytes",
161        &["operation"],
162        vec![
163            1024.0,
164            4096.0,
165            16384.0,
166            65536.0,
167            262144.0,
168            1048576.0
169        ]
170    )
171    .unwrap();
172
173    // Cache Metrics
174
175    /// Cache hits
176    pub static ref CACHE_HITS_TOTAL: IntCounterVec = register_int_counter_vec!(
177        "ipfrs_cache_hits_total",
178        "Total cache hits",
179        &["cache_type"]
180    )
181    .unwrap();
182
183    /// Cache misses
184    pub static ref CACHE_MISSES_TOTAL: IntCounterVec = register_int_counter_vec!(
185        "ipfrs_cache_misses_total",
186        "Total cache misses",
187        &["cache_type"]
188    )
189    .unwrap();
190
191    /// Current cache size
192    pub static ref CACHE_SIZE_BYTES: GaugeVec = register_gauge_vec!(
193        "ipfrs_cache_size_bytes",
194        "Current cache size in bytes",
195        &["cache_type"]
196    )
197    .unwrap();
198
199    // Authentication Metrics
200
201    /// Authentication attempts
202    pub static ref AUTH_ATTEMPTS_TOTAL: IntCounterVec = register_int_counter_vec!(
203        "ipfrs_auth_attempts_total",
204        "Total authentication attempts",
205        &["method", "result"]
206    )
207    .unwrap();
208
209    /// Active authenticated sessions
210    pub static ref AUTH_SESSIONS_ACTIVE: IntGaugeVec = register_int_gauge_vec!(
211        "ipfrs_auth_sessions_active",
212        "Currently active authenticated sessions",
213        &["user"]
214    )
215    .unwrap();
216
217    // Rate Limiting Metrics
218
219    /// Rate limit hits (requests blocked)
220    pub static ref RATE_LIMIT_HITS_TOTAL: IntCounterVec = register_int_counter_vec!(
221        "ipfrs_rate_limit_hits_total",
222        "Total rate limit hits (requests blocked)",
223        &["endpoint", "client_ip"]
224    )
225    .unwrap();
226
227    /// Available rate limit tokens
228    pub static ref RATE_LIMIT_TOKENS_AVAILABLE: GaugeVec = register_gauge_vec!(
229        "ipfrs_rate_limit_tokens_available",
230        "Available rate limit tokens",
231        &["client_ip"]
232    )
233    .unwrap();
234
235    // WebSocket Metrics
236
237    /// Active WebSocket connections
238    pub static ref WEBSOCKET_CONNECTIONS_ACTIVE: IntGaugeVec = register_int_gauge_vec!(
239        "ipfrs_websocket_connections_active",
240        "Currently active WebSocket connections",
241        &["topic"]
242    )
243    .unwrap();
244
245    /// WebSocket messages sent
246    pub static ref WEBSOCKET_MESSAGES_SENT_TOTAL: IntCounterVec = register_int_counter_vec!(
247        "ipfrs_websocket_messages_sent_total",
248        "Total WebSocket messages sent",
249        &["topic", "event_type"]
250    )
251    .unwrap();
252
253    /// WebSocket messages received
254    pub static ref WEBSOCKET_MESSAGES_RECEIVED_TOTAL: IntCounterVec = register_int_counter_vec!(
255        "ipfrs_websocket_messages_received_total",
256        "Total WebSocket messages received",
257        &["message_type"]
258    )
259    .unwrap();
260
261    // gRPC Metrics
262
263    /// gRPC requests total
264    pub static ref GRPC_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
265        "ipfrs_grpc_requests_total",
266        "Total gRPC requests",
267        &["service", "method", "status"]
268    )
269    .unwrap();
270
271    /// gRPC request duration
272    pub static ref GRPC_REQUEST_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
273        "ipfrs_grpc_request_duration_seconds",
274        "gRPC request duration in seconds",
275        &["service", "method"],
276        vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
277    )
278    .unwrap();
279
280    // Tensor Operations Metrics
281
282    /// Tensor operations total
283    pub static ref TENSOR_OPERATIONS_TOTAL: IntCounterVec = register_int_counter_vec!(
284        "ipfrs_tensor_operations_total",
285        "Total tensor operations",
286        &["operation", "dtype"]
287    )
288    .unwrap();
289
290    /// Tensor slice operations
291    pub static ref TENSOR_SLICE_OPERATIONS_TOTAL: IntCounterVec = register_int_counter_vec!(
292        "ipfrs_tensor_slice_operations_total",
293        "Total tensor slice operations",
294        &["dimensions"]
295    )
296    .unwrap();
297
298    /// Tensor size in bytes
299    pub static ref TENSOR_SIZE_BYTES: HistogramVec = register_histogram_vec!(
300        "ipfrs_tensor_size_bytes",
301        "Tensor size in bytes",
302        &["dtype"],
303        vec![
304            1000.0,
305            10_000.0,
306            100_000.0,
307            1_000_000.0,
308            10_000_000.0,
309            100_000_000.0,
310            1_000_000_000.0
311        ]
312    )
313    .unwrap();
314
315    // System Metrics
316
317    /// Total memory allocated (in bytes)
318    pub static ref MEMORY_ALLOCATED_BYTES: IntGaugeVec = register_int_gauge_vec!(
319        "ipfrs_memory_allocated_bytes",
320        "Total memory allocated in bytes",
321        &["component"]
322    )
323    .unwrap();
324
325    /// Number of goroutines (async tasks)
326    pub static ref ASYNC_TASKS_ACTIVE: IntGaugeVec = register_int_gauge_vec!(
327        "ipfrs_async_tasks_active",
328        "Currently active async tasks",
329        &["type"]
330    )
331    .unwrap();
332}
333
334/// Helper struct for timing operations
335pub struct Timer {
336    start: Instant,
337    labels: Vec<String>,
338}
339
340impl Timer {
341    /// Create a new timer with labels
342    pub fn new(labels: Vec<String>) -> Self {
343        Self {
344            start: Instant::now(),
345            labels,
346        }
347    }
348
349    /// Observe the duration and record it to the given histogram
350    pub fn observe_duration(self, histogram: &HistogramVec) {
351        let duration = self.start.elapsed().as_secs_f64();
352        histogram
353            .with_label_values(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
354            .observe(duration);
355    }
356}
357
358/// Record an HTTP request
359#[allow(dead_code)]
360pub fn record_http_request(endpoint: &str, method: &str, status: u16) {
361    HTTP_REQUESTS_TOTAL
362        .with_label_values(&[endpoint, method, &status.to_string()])
363        .inc();
364}
365
366/// Start timing an HTTP request
367#[allow(dead_code)]
368pub fn start_http_request_timer(endpoint: &str, method: &str) -> Timer {
369    HTTP_CONNECTIONS_ACTIVE.with_label_values(&[endpoint]).inc();
370    Timer::new(vec![endpoint.to_string(), method.to_string()])
371}
372
373/// Finish timing an HTTP request
374#[allow(dead_code)]
375pub fn finish_http_request_timer(timer: Timer, endpoint: &str) {
376    timer.observe_duration(&HTTP_REQUEST_DURATION_SECONDS);
377    HTTP_CONNECTIONS_ACTIVE.with_label_values(&[endpoint]).dec();
378}
379
380/// Record HTTP request size
381#[allow(dead_code)]
382pub fn record_http_request_size(endpoint: &str, method: &str, size: usize) {
383    HTTP_REQUEST_SIZE_BYTES
384        .with_label_values(&[endpoint, method])
385        .observe(size as f64);
386}
387
388/// Record HTTP response size
389#[allow(dead_code)]
390pub fn record_http_response_size(endpoint: &str, method: &str, size: usize) {
391    HTTP_RESPONSE_SIZE_BYTES
392        .with_label_values(&[endpoint, method])
393        .observe(size as f64);
394}
395
396/// Record block retrieval
397#[allow(dead_code)]
398pub fn record_block_retrieved(source: &str) {
399    BLOCKS_RETRIEVED_TOTAL.with_label_values(&[source]).inc();
400}
401
402/// Record block storage
403#[allow(dead_code)]
404pub fn record_block_stored(destination: &str) {
405    BLOCKS_STORED_TOTAL.with_label_values(&[destination]).inc();
406}
407
408/// Record block error
409#[allow(dead_code)]
410pub fn record_block_error(operation: &str, error_type: &str) {
411    BLOCK_ERRORS_TOTAL
412        .with_label_values(&[operation, error_type])
413        .inc();
414}
415
416/// Record upload bytes
417#[allow(dead_code)]
418pub fn record_upload_bytes(endpoint: &str, bytes: u64) {
419    UPLOAD_BYTES_TOTAL
420        .with_label_values(&[endpoint])
421        .inc_by(bytes as f64);
422}
423
424/// Record download bytes
425#[allow(dead_code)]
426pub fn record_download_bytes(endpoint: &str, bytes: u64) {
427    DOWNLOAD_BYTES_TOTAL
428        .with_label_values(&[endpoint])
429        .inc_by(bytes as f64);
430}
431
432/// Record cache hit
433#[allow(dead_code)]
434pub fn record_cache_hit(cache_type: &str) {
435    CACHE_HITS_TOTAL.with_label_values(&[cache_type]).inc();
436}
437
438/// Record cache miss
439#[allow(dead_code)]
440pub fn record_cache_miss(cache_type: &str) {
441    CACHE_MISSES_TOTAL.with_label_values(&[cache_type]).inc();
442}
443
444/// Record authentication attempt
445#[allow(dead_code)]
446pub fn record_auth_attempt(method: &str, result: &str) {
447    AUTH_ATTEMPTS_TOTAL
448        .with_label_values(&[method, result])
449        .inc();
450}
451
452/// Record rate limit hit
453#[allow(dead_code)]
454pub fn record_rate_limit_hit(endpoint: &str, client_ip: &str) {
455    RATE_LIMIT_HITS_TOTAL
456        .with_label_values(&[endpoint, client_ip])
457        .inc();
458}
459
460/// Encode all metrics in Prometheus text format
461pub fn encode_metrics() -> Result<String, prometheus::Error> {
462    let encoder = TextEncoder::new();
463    let metric_families = prometheus::gather();
464    let mut buffer = Vec::new();
465    encoder.encode(&metric_families, &mut buffer)?;
466    String::from_utf8(buffer)
467        .map_err(|e| prometheus::Error::Msg(format!("Failed to encode metrics as UTF-8: {}", e)))
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_record_http_request() {
476        record_http_request("/api/v0/add", "POST", 200);
477        let metrics = encode_metrics().unwrap();
478        assert!(metrics.contains("ipfrs_http_requests_total"));
479    }
480
481    #[test]
482    fn test_timer() {
483        let timer = Timer::new(vec!["test".to_string(), "GET".to_string()]);
484        std::thread::sleep(std::time::Duration::from_millis(10));
485        timer.observe_duration(&HTTP_REQUEST_DURATION_SECONDS);
486
487        let metrics = encode_metrics().unwrap();
488        assert!(metrics.contains("ipfrs_http_request_duration_seconds"));
489    }
490
491    #[test]
492    fn test_record_block_operations() {
493        record_block_retrieved("local");
494        record_block_stored("blockstore");
495        record_block_error("get", "not_found");
496
497        let metrics = encode_metrics().unwrap();
498        assert!(metrics.contains("ipfrs_blocks_retrieved_total"));
499        assert!(metrics.contains("ipfrs_blocks_stored_total"));
500        assert!(metrics.contains("ipfrs_block_errors_total"));
501    }
502
503    #[test]
504    fn test_record_cache_operations() {
505        record_cache_hit("block_cache");
506        record_cache_miss("block_cache");
507
508        let metrics = encode_metrics().unwrap();
509        assert!(metrics.contains("ipfrs_cache_hits_total"));
510        assert!(metrics.contains("ipfrs_cache_misses_total"));
511    }
512
513    #[test]
514    fn test_encode_metrics() {
515        // Record some metrics to ensure encoder has data
516        record_http_request("/test", "GET", 200);
517        record_block_retrieved("test_store");
518
519        let result = encode_metrics();
520        assert!(result.is_ok());
521
522        let metrics = result.unwrap();
523        // Metrics should include at least the recorded ones
524        assert!(
525            metrics.contains("ipfrs_http_requests_total")
526                || metrics.contains("ipfrs_blocks_retrieved_total")
527        );
528    }
529}