Skip to main content

network_protocol/utils/
metrics.rs

1//! Observability and Metrics
2//!
3//! This module provides metrics collection and observability features
4//! for monitoring protocol performance and health.
5//!
6//! Uses atomic counters for thread-safe metrics collection.
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Instant;
10use tracing::{debug, info};
11
12/// Global metrics collector for protocol operations
13#[derive(Debug)]
14pub struct Metrics {
15    /// Total connections established
16    pub connections_total: AtomicU64,
17    /// Currently active connections
18    pub connections_active: AtomicU64,
19    /// Total handshake attempts
20    pub handshakes_total: AtomicU64,
21    /// Successful handshakes
22    pub handshakes_success: AtomicU64,
23    /// Failed handshakes
24    pub handshakes_failed: AtomicU64,
25    /// Total messages sent
26    pub messages_sent: AtomicU64,
27    /// Total messages received
28    pub messages_received: AtomicU64,
29    /// Total bytes sent
30    pub bytes_sent: AtomicU64,
31    /// Total bytes received
32    pub bytes_received: AtomicU64,
33    /// Total compression operations
34    pub compression_total: AtomicU64,
35    /// Successful compression operations
36    pub compression_success: AtomicU64,
37    /// Total encryption operations
38    pub encryption_total: AtomicU64,
39    /// Successful encryption operations
40    pub encryption_success: AtomicU64,
41    /// Total replay cache hits
42    pub replay_cache_hits: AtomicU64,
43    /// Total replay cache misses
44    pub replay_cache_misses: AtomicU64,
45    /// Connection errors
46    pub connection_errors: AtomicU64,
47    /// Protocol errors
48    pub protocol_errors: AtomicU64,
49    /// Start time for uptime calculation
50    start_time: Instant,
51}
52
53impl Metrics {
54    /// Create a new metrics collector
55    pub fn new() -> Self {
56        Self {
57            connections_total: AtomicU64::new(0),
58            connections_active: AtomicU64::new(0),
59            handshakes_total: AtomicU64::new(0),
60            handshakes_success: AtomicU64::new(0),
61            handshakes_failed: AtomicU64::new(0),
62            messages_sent: AtomicU64::new(0),
63            messages_received: AtomicU64::new(0),
64            bytes_sent: AtomicU64::new(0),
65            bytes_received: AtomicU64::new(0),
66            compression_total: AtomicU64::new(0),
67            compression_success: AtomicU64::new(0),
68            encryption_total: AtomicU64::new(0),
69            encryption_success: AtomicU64::new(0),
70            replay_cache_hits: AtomicU64::new(0),
71            replay_cache_misses: AtomicU64::new(0),
72            connection_errors: AtomicU64::new(0),
73            protocol_errors: AtomicU64::new(0),
74            start_time: Instant::now(),
75        }
76    }
77
78    /// Record a new connection
79    pub fn connection_established(&self) {
80        self.connections_total.fetch_add(1, Ordering::Relaxed);
81        self.connections_active.fetch_add(1, Ordering::Relaxed);
82    }
83
84    /// Record a connection closed
85    pub fn connection_closed(&self) {
86        self.connections_active.fetch_sub(1, Ordering::Relaxed);
87    }
88
89    /// Record a handshake attempt
90    pub fn handshake_attempt(&self) {
91        self.handshakes_total.fetch_add(1, Ordering::Relaxed);
92    }
93
94    /// Record a successful handshake
95    pub fn handshake_success(&self) {
96        self.handshakes_success.fetch_add(1, Ordering::Relaxed);
97    }
98
99    /// Record a failed handshake
100    pub fn handshake_failed(&self) {
101        self.handshakes_failed.fetch_add(1, Ordering::Relaxed);
102    }
103
104    /// Record a message sent
105    pub fn message_sent(&self, byte_count: u64) {
106        self.messages_sent.fetch_add(1, Ordering::Relaxed);
107        self.bytes_sent.fetch_add(byte_count, Ordering::Relaxed);
108    }
109
110    /// Record a message received
111    pub fn message_received(&self, byte_count: u64) {
112        self.messages_received.fetch_add(1, Ordering::Relaxed);
113        self.bytes_received.fetch_add(byte_count, Ordering::Relaxed);
114    }
115
116    /// Record a compression attempt
117    pub fn compression_attempt(&self) {
118        self.compression_total.fetch_add(1, Ordering::Relaxed);
119    }
120
121    /// Record a successful compression
122    pub fn compression_success(&self) {
123        self.compression_success.fetch_add(1, Ordering::Relaxed);
124    }
125
126    /// Record an encryption attempt
127    pub fn encryption_attempt(&self) {
128        self.encryption_total.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Record a successful encryption
132    pub fn encryption_success(&self) {
133        self.encryption_success.fetch_add(1, Ordering::Relaxed);
134    }
135
136    /// Record a replay cache hit
137    pub fn replay_cache_hit(&self) {
138        self.replay_cache_hits.fetch_add(1, Ordering::Relaxed);
139    }
140
141    /// Record a replay cache miss
142    pub fn replay_cache_miss(&self) {
143        self.replay_cache_misses.fetch_add(1, Ordering::Relaxed);
144    }
145
146    /// Record a connection error
147    pub fn connection_error(&self) {
148        self.connection_errors.fetch_add(1, Ordering::Relaxed);
149    }
150
151    /// Record a protocol error
152    pub fn protocol_error(&self) {
153        self.protocol_errors.fetch_add(1, Ordering::Relaxed);
154    }
155
156    /// Get current metrics snapshot
157    pub fn snapshot(&self) -> MetricsSnapshot {
158        MetricsSnapshot {
159            connections_total: self.connections_total.load(Ordering::Relaxed),
160            connections_active: self.connections_active.load(Ordering::Relaxed),
161            handshakes_total: self.handshakes_total.load(Ordering::Relaxed),
162            handshakes_success: self.handshakes_success.load(Ordering::Relaxed),
163            handshakes_failed: self.handshakes_failed.load(Ordering::Relaxed),
164            messages_sent: self.messages_sent.load(Ordering::Relaxed),
165            messages_received: self.messages_received.load(Ordering::Relaxed),
166            bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
167            bytes_received: self.bytes_received.load(Ordering::Relaxed),
168            compression_total: self.compression_total.load(Ordering::Relaxed),
169            compression_success: self.compression_success.load(Ordering::Relaxed),
170            encryption_total: self.encryption_total.load(Ordering::Relaxed),
171            encryption_success: self.encryption_success.load(Ordering::Relaxed),
172            replay_cache_hits: self.replay_cache_hits.load(Ordering::Relaxed),
173            replay_cache_misses: self.replay_cache_misses.load(Ordering::Relaxed),
174            connection_errors: self.connection_errors.load(Ordering::Relaxed),
175            protocol_errors: self.protocol_errors.load(Ordering::Relaxed),
176            uptime_seconds: self.start_time.elapsed().as_secs(),
177        }
178    }
179
180    /// Log current metrics
181    pub fn log_metrics(&self) {
182        let snapshot = self.snapshot();
183        info!(
184            connections_total = snapshot.connections_total,
185            connections_active = snapshot.connections_active,
186            handshakes_total = snapshot.handshakes_total,
187            handshakes_success = snapshot.handshakes_success,
188            handshakes_failed = snapshot.handshakes_failed,
189            messages_sent = snapshot.messages_sent,
190            messages_received = snapshot.messages_received,
191            bytes_sent = snapshot.bytes_sent,
192            bytes_received = snapshot.bytes_received,
193            compression_total = snapshot.compression_total,
194            compression_success = snapshot.compression_success,
195            encryption_total = snapshot.encryption_total,
196            encryption_success = snapshot.encryption_success,
197            replay_cache_hits = snapshot.replay_cache_hits,
198            replay_cache_misses = snapshot.replay_cache_misses,
199            connection_errors = snapshot.connection_errors,
200            protocol_errors = snapshot.protocol_errors,
201            uptime_seconds = snapshot.uptime_seconds,
202            "Protocol metrics snapshot"
203        );
204    }
205}
206
207impl Default for Metrics {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// Snapshot of metrics at a point in time
214#[derive(Debug, Clone)]
215pub struct MetricsSnapshot {
216    pub connections_total: u64,
217    pub connections_active: u64,
218    pub handshakes_total: u64,
219    pub handshakes_success: u64,
220    pub handshakes_failed: u64,
221    pub messages_sent: u64,
222    pub messages_received: u64,
223    pub bytes_sent: u64,
224    pub bytes_received: u64,
225    pub compression_total: u64,
226    pub compression_success: u64,
227    pub encryption_total: u64,
228    pub encryption_success: u64,
229    pub replay_cache_hits: u64,
230    pub replay_cache_misses: u64,
231    pub connection_errors: u64,
232    pub protocol_errors: u64,
233    pub uptime_seconds: u64,
234}
235
236/// Global metrics instance (lazy static for simplicity)
237static METRICS: once_cell::sync::Lazy<Metrics> = once_cell::sync::Lazy::new(Metrics::new);
238
239/// Get the global metrics instance
240pub fn global_metrics() -> &'static Metrics {
241    &METRICS
242}
243
244/// Initialize metrics collection (call once at startup)
245pub fn init_metrics() {
246    // Force initialization
247    let _ = global_metrics();
248    info!("Metrics collection initialized");
249}
250
251/// Timer for measuring operation duration
252pub struct Timer {
253    start: Instant,
254    operation: &'static str,
255}
256
257impl Timer {
258    /// Start timing an operation
259    pub fn start(operation: &'static str) -> Self {
260        Self {
261            start: Instant::now(),
262            operation,
263        }
264    }
265}
266
267impl Drop for Timer {
268    fn drop(&mut self) {
269        let duration = self.start.elapsed();
270        debug!(
271            operation = self.operation,
272            duration_ms = duration.as_millis(),
273            "Operation completed"
274        );
275    }
276}