ant_quic/logging/
metrics.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8/// Performance metrics collection and logging
9///
10/// Tracks and logs performance metrics for monitoring and optimization
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15use super::{LogEvent, logger};
16use crate::{Duration, Instant};
17
18/// Metrics collector for performance tracking
19#[derive(Debug)]
20pub struct MetricsCollector {
21    /// Event counts by level and component
22    event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
23    /// Throughput metrics
24    throughput: Arc<ThroughputTracker>,
25    /// Latency metrics
26    latency: Arc<LatencyTracker>,
27    /// Connection metrics
28    connections: Arc<ConnectionMetrics>,
29}
30
31impl Default for MetricsCollector {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl MetricsCollector {
38    /// Create a new metrics collector
39    pub fn new() -> Self {
40        Self {
41            event_counts: Arc::new(Mutex::new(HashMap::new())),
42            throughput: Arc::new(ThroughputTracker::new()),
43            latency: Arc::new(LatencyTracker::new()),
44            connections: Arc::new(ConnectionMetrics::new()),
45        }
46    }
47
48    /// Record a log event for metrics
49    pub fn record_event(&self, event: &LogEvent) {
50        if let Ok(mut counts) = self.event_counts.lock() {
51            let key = (event.level, event.target.clone());
52            *counts.entry(key).or_insert(0) += 1;
53        }
54    }
55
56    /// Get a summary of collected metrics
57    pub fn summary(&self) -> MetricsSummary {
58        let event_counts = self
59            .event_counts
60            .lock()
61            .map(|counts| counts.clone())
62            .unwrap_or_default();
63
64        MetricsSummary {
65            event_counts,
66            throughput: self.throughput.summary(),
67            latency: self.latency.summary(),
68            connections: self.connections.summary(),
69        }
70    }
71}
72
73/// Throughput tracking
74#[derive(Debug)]
75pub struct ThroughputTracker {
76    bytes_sent: AtomicU64,
77    bytes_received: AtomicU64,
78    packets_sent: AtomicU64,
79    packets_received: AtomicU64,
80    start_time: Instant,
81}
82
83impl Default for ThroughputTracker {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl ThroughputTracker {
90    /// Create a new throughput tracker
91    pub fn new() -> Self {
92        Self {
93            bytes_sent: AtomicU64::new(0),
94            bytes_received: AtomicU64::new(0),
95            packets_sent: AtomicU64::new(0),
96            packets_received: AtomicU64::new(0),
97            start_time: Instant::now(),
98        }
99    }
100
101    /// Record bytes sent and increment packet count
102    pub fn record_sent(&self, bytes: u64) {
103        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
104        self.packets_sent.fetch_add(1, Ordering::Relaxed);
105    }
106
107    /// Record bytes received and increment packet count
108    pub fn record_received(&self, bytes: u64) {
109        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
110        self.packets_received.fetch_add(1, Ordering::Relaxed);
111    }
112
113    /// Produce a summary snapshot of throughput metrics
114    pub fn summary(&self) -> ThroughputSummary {
115        let duration = self.start_time.elapsed();
116        let duration_secs = duration.as_secs_f64();
117
118        let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
119        let bytes_received = self.bytes_received.load(Ordering::Relaxed);
120
121        ThroughputSummary {
122            bytes_sent,
123            bytes_received,
124            packets_sent: self.packets_sent.load(Ordering::Relaxed),
125            packets_received: self.packets_received.load(Ordering::Relaxed),
126            duration,
127            send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
128            recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
129        }
130    }
131}
132
133/// Latency tracking
134#[derive(Debug)]
135pub struct LatencyTracker {
136    samples: Arc<Mutex<Vec<Duration>>>,
137    min_rtt: AtomicU64, // microseconds
138    max_rtt: AtomicU64, // microseconds
139    sum_rtt: AtomicU64, // microseconds
140    count: AtomicU64,
141}
142
143impl Default for LatencyTracker {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149impl LatencyTracker {
150    /// Create a new latency tracker
151    pub fn new() -> Self {
152        Self {
153            samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
154            min_rtt: AtomicU64::new(u64::MAX),
155            max_rtt: AtomicU64::new(0),
156            sum_rtt: AtomicU64::new(0),
157            count: AtomicU64::new(0),
158        }
159    }
160
161    /// Record a round-trip time measurement
162    pub fn record_rtt(&self, rtt: Duration) {
163        let micros = rtt.as_micros() as u64;
164
165        // Update min
166        let mut current_min = self.min_rtt.load(Ordering::Relaxed);
167        while micros < current_min {
168            match self.min_rtt.compare_exchange_weak(
169                current_min,
170                micros,
171                Ordering::Relaxed,
172                Ordering::Relaxed,
173            ) {
174                Ok(_) => break,
175                Err(x) => current_min = x,
176            }
177        }
178
179        // Update max
180        let mut current_max = self.max_rtt.load(Ordering::Relaxed);
181        while micros > current_max {
182            match self.max_rtt.compare_exchange_weak(
183                current_max,
184                micros,
185                Ordering::Relaxed,
186                Ordering::Relaxed,
187            ) {
188                Ok(_) => break,
189                Err(x) => current_max = x,
190            }
191        }
192
193        // Update sum and count
194        self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
195        self.count.fetch_add(1, Ordering::Relaxed);
196
197        // Store sample
198        if let Ok(mut samples) = self.samples.lock() {
199            if samples.len() < 1000 {
200                samples.push(rtt);
201            }
202        }
203    }
204
205    /// Produce a summary snapshot of latency metrics
206    pub fn summary(&self) -> LatencySummary {
207        let count = self.count.load(Ordering::Relaxed);
208        let min_rtt = self.min_rtt.load(Ordering::Relaxed);
209
210        LatencySummary {
211            min_rtt: if min_rtt == u64::MAX {
212                Duration::from_micros(0)
213            } else {
214                Duration::from_micros(min_rtt)
215            },
216            max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
217            avg_rtt: if count > 0 {
218                Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
219            } else {
220                Duration::from_micros(0)
221            },
222            sample_count: count,
223        }
224    }
225}
226
227/// Connection metrics
228#[derive(Debug)]
229pub struct ConnectionMetrics {
230    active_connections: AtomicUsize,
231    total_connections: AtomicU64,
232    failed_connections: AtomicU64,
233    migrated_connections: AtomicU64,
234}
235
236impl Default for ConnectionMetrics {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242impl ConnectionMetrics {
243    /// Create a new connection metrics tracker
244    pub fn new() -> Self {
245        Self {
246            active_connections: AtomicUsize::new(0),
247            total_connections: AtomicU64::new(0),
248            failed_connections: AtomicU64::new(0),
249            migrated_connections: AtomicU64::new(0),
250        }
251    }
252
253    /// Record that a connection was opened
254    pub fn connection_opened(&self) {
255        self.active_connections.fetch_add(1, Ordering::Relaxed);
256        self.total_connections.fetch_add(1, Ordering::Relaxed);
257    }
258
259    /// Record that a connection was closed
260    pub fn connection_closed(&self) {
261        self.active_connections.fetch_sub(1, Ordering::Relaxed);
262    }
263
264    /// Record a connection failure
265    pub fn connection_failed(&self) {
266        self.failed_connections.fetch_add(1, Ordering::Relaxed);
267    }
268
269    /// Record a connection path migration
270    pub fn connection_migrated(&self) {
271        self.migrated_connections.fetch_add(1, Ordering::Relaxed);
272    }
273
274    /// Generate a snapshot summary of current connection metrics
275    pub fn summary(&self) -> ConnectionMetricsSummary {
276        ConnectionMetricsSummary {
277            active_connections: self.active_connections.load(Ordering::Relaxed),
278            total_connections: self.total_connections.load(Ordering::Relaxed),
279            failed_connections: self.failed_connections.load(Ordering::Relaxed),
280            migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
281        }
282    }
283}
284
285/// Metrics summary
286#[derive(Debug, Clone)]
287pub struct MetricsSummary {
288    /// Counts of log events by level and target
289    pub event_counts: HashMap<(tracing::Level, String), u64>,
290    /// Aggregate throughput statistics
291    pub throughput: ThroughputSummary,
292    /// Aggregate latency statistics
293    pub latency: LatencySummary,
294    /// Aggregate connection lifecycle statistics
295    pub connections: ConnectionMetricsSummary,
296}
297
298/// Throughput metrics
299#[derive(Debug, Clone)]
300pub struct ThroughputMetrics {
301    /// Total bytes sent
302    pub bytes_sent: u64,
303    /// Total bytes received
304    pub bytes_received: u64,
305    /// Measurement window duration
306    pub duration: Duration,
307    /// Number of packets sent
308    pub packets_sent: u64,
309    /// Number of packets received
310    pub packets_received: u64,
311}
312
313/// Throughput summary
314#[derive(Debug, Clone)]
315pub struct ThroughputSummary {
316    /// Total bytes sent
317    pub bytes_sent: u64,
318    /// Total bytes received
319    pub bytes_received: u64,
320    /// Number of packets sent
321    pub packets_sent: u64,
322    /// Number of packets received
323    pub packets_received: u64,
324    /// Measurement window duration
325    pub duration: Duration,
326    /// Calculated send rate in megabits per second
327    pub send_rate_mbps: f64,
328    /// Calculated receive rate in megabits per second
329    pub recv_rate_mbps: f64,
330}
331
332/// Latency metrics
333#[derive(Debug, Clone)]
334pub struct LatencyMetrics {
335    /// Latest round-trip time sample
336    pub rtt: Duration,
337    /// Minimum observed RTT
338    pub min_rtt: Duration,
339    /// Maximum observed RTT
340    pub max_rtt: Duration,
341    /// Smoothed RTT estimate
342    pub smoothed_rtt: Duration,
343}
344
345/// Latency summary
346#[derive(Debug, Clone)]
347pub struct LatencySummary {
348    /// Minimum observed RTT
349    pub min_rtt: Duration,
350    /// Maximum observed RTT
351    pub max_rtt: Duration,
352    /// Average RTT
353    pub avg_rtt: Duration,
354    /// Number of samples aggregated
355    pub sample_count: u64,
356}
357
358/// Connection metrics summary
359#[derive(Debug, Clone)]
360pub struct ConnectionMetricsSummary {
361    /// Number of currently active connections
362    pub active_connections: usize,
363    /// Total connections observed
364    pub total_connections: u64,
365    /// Number of failed connections
366    pub failed_connections: u64,
367    /// Number of connections that migrated paths
368    pub migrated_connections: u64,
369}
370
371/// Log throughput metrics
372pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
373    let duration_secs = metrics.duration.as_secs_f64();
374    let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
375    let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
376
377    let mut fields = HashMap::new();
378    fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
379    fields.insert(
380        "bytes_received".to_string(),
381        metrics.bytes_received.to_string(),
382    );
383    fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
384    fields.insert(
385        "packets_received".to_string(),
386        metrics.packets_received.to_string(),
387    );
388    fields.insert(
389        "duration_ms".to_string(),
390        metrics.duration.as_millis().to_string(),
391    );
392    fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
393    fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
394
395    logger().log_event(LogEvent {
396        timestamp: Instant::now(),
397        level: tracing::Level::INFO,
398        target: "ant_quic::metrics::throughput".to_string(),
399        message: "throughput_metrics".to_string(),
400        fields,
401        span_id: None,
402    });
403}
404
405/// Log latency metrics
406pub fn log_latency_metrics(metrics: &LatencyMetrics) {
407    let mut fields = HashMap::new();
408    fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
409    fields.insert(
410        "min_rtt_ms".to_string(),
411        metrics.min_rtt.as_millis().to_string(),
412    );
413    fields.insert(
414        "max_rtt_ms".to_string(),
415        metrics.max_rtt.as_millis().to_string(),
416    );
417    fields.insert(
418        "smoothed_rtt_ms".to_string(),
419        metrics.smoothed_rtt.as_millis().to_string(),
420    );
421
422    logger().log_event(LogEvent {
423        timestamp: Instant::now(),
424        level: tracing::Level::INFO,
425        target: "ant_quic::metrics::latency".to_string(),
426        message: "latency_metrics".to_string(),
427        fields,
428        span_id: None,
429    });
430}