ant_quic/logging/
metrics.rs

1/// Performance metrics collection and logging
2///
3/// Tracks and logs performance metrics for monitoring and optimization
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7
8use super::{LogEvent, logger};
9use crate::{Duration, Instant};
10
11/// Metrics collector for performance tracking
12#[derive(Debug)]
13pub struct MetricsCollector {
14    /// Event counts by level and component
15    event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
16    /// Throughput metrics
17    throughput: Arc<ThroughputTracker>,
18    /// Latency metrics
19    latency: Arc<LatencyTracker>,
20    /// Connection metrics
21    connections: Arc<ConnectionMetrics>,
22}
23
24impl Default for MetricsCollector {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl MetricsCollector {
31    /// Create a new metrics collector
32    pub fn new() -> Self {
33        Self {
34            event_counts: Arc::new(Mutex::new(HashMap::new())),
35            throughput: Arc::new(ThroughputTracker::new()),
36            latency: Arc::new(LatencyTracker::new()),
37            connections: Arc::new(ConnectionMetrics::new()),
38        }
39    }
40
41    /// Record a log event for metrics
42    pub fn record_event(&self, event: &LogEvent) {
43        if let Ok(mut counts) = self.event_counts.lock() {
44            let key = (event.level, event.target.clone());
45            *counts.entry(key).or_insert(0) += 1;
46        }
47    }
48
49    /// Get a summary of collected metrics
50    pub fn summary(&self) -> MetricsSummary {
51        let event_counts = self
52            .event_counts
53            .lock()
54            .map(|counts| counts.clone())
55            .unwrap_or_default();
56
57        MetricsSummary {
58            event_counts,
59            throughput: self.throughput.summary(),
60            latency: self.latency.summary(),
61            connections: self.connections.summary(),
62        }
63    }
64}
65
66/// Throughput tracking
67#[derive(Debug)]
68pub struct ThroughputTracker {
69    bytes_sent: AtomicU64,
70    bytes_received: AtomicU64,
71    packets_sent: AtomicU64,
72    packets_received: AtomicU64,
73    start_time: Instant,
74}
75
76impl Default for ThroughputTracker {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82impl ThroughputTracker {
83    pub fn new() -> Self {
84        Self {
85            bytes_sent: AtomicU64::new(0),
86            bytes_received: AtomicU64::new(0),
87            packets_sent: AtomicU64::new(0),
88            packets_received: AtomicU64::new(0),
89            start_time: Instant::now(),
90        }
91    }
92
93    pub fn record_sent(&self, bytes: u64) {
94        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
95        self.packets_sent.fetch_add(1, Ordering::Relaxed);
96    }
97
98    pub fn record_received(&self, bytes: u64) {
99        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
100        self.packets_received.fetch_add(1, Ordering::Relaxed);
101    }
102
103    pub fn summary(&self) -> ThroughputSummary {
104        let duration = self.start_time.elapsed();
105        let duration_secs = duration.as_secs_f64();
106
107        let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
108        let bytes_received = self.bytes_received.load(Ordering::Relaxed);
109
110        ThroughputSummary {
111            bytes_sent,
112            bytes_received,
113            packets_sent: self.packets_sent.load(Ordering::Relaxed),
114            packets_received: self.packets_received.load(Ordering::Relaxed),
115            duration,
116            send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
117            recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
118        }
119    }
120}
121
122/// Latency tracking
123#[derive(Debug)]
124pub struct LatencyTracker {
125    samples: Arc<Mutex<Vec<Duration>>>,
126    min_rtt: AtomicU64, // microseconds
127    max_rtt: AtomicU64, // microseconds
128    sum_rtt: AtomicU64, // microseconds
129    count: AtomicU64,
130}
131
132impl Default for LatencyTracker {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138impl LatencyTracker {
139    pub fn new() -> Self {
140        Self {
141            samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
142            min_rtt: AtomicU64::new(u64::MAX),
143            max_rtt: AtomicU64::new(0),
144            sum_rtt: AtomicU64::new(0),
145            count: AtomicU64::new(0),
146        }
147    }
148
149    pub fn record_rtt(&self, rtt: Duration) {
150        let micros = rtt.as_micros() as u64;
151
152        // Update min
153        let mut current_min = self.min_rtt.load(Ordering::Relaxed);
154        while micros < current_min {
155            match self.min_rtt.compare_exchange_weak(
156                current_min,
157                micros,
158                Ordering::Relaxed,
159                Ordering::Relaxed,
160            ) {
161                Ok(_) => break,
162                Err(x) => current_min = x,
163            }
164        }
165
166        // Update max
167        let mut current_max = self.max_rtt.load(Ordering::Relaxed);
168        while micros > current_max {
169            match self.max_rtt.compare_exchange_weak(
170                current_max,
171                micros,
172                Ordering::Relaxed,
173                Ordering::Relaxed,
174            ) {
175                Ok(_) => break,
176                Err(x) => current_max = x,
177            }
178        }
179
180        // Update sum and count
181        self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
182        self.count.fetch_add(1, Ordering::Relaxed);
183
184        // Store sample
185        if let Ok(mut samples) = self.samples.lock() {
186            if samples.len() < 1000 {
187                samples.push(rtt);
188            }
189        }
190    }
191
192    pub fn summary(&self) -> LatencySummary {
193        let count = self.count.load(Ordering::Relaxed);
194        let min_rtt = self.min_rtt.load(Ordering::Relaxed);
195
196        LatencySummary {
197            min_rtt: if min_rtt == u64::MAX {
198                Duration::from_micros(0)
199            } else {
200                Duration::from_micros(min_rtt)
201            },
202            max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
203            avg_rtt: if count > 0 {
204                Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
205            } else {
206                Duration::from_micros(0)
207            },
208            sample_count: count,
209        }
210    }
211}
212
213/// Connection metrics
214#[derive(Debug)]
215pub struct ConnectionMetrics {
216    active_connections: AtomicUsize,
217    total_connections: AtomicU64,
218    failed_connections: AtomicU64,
219    migrated_connections: AtomicU64,
220}
221
222impl Default for ConnectionMetrics {
223    fn default() -> Self {
224        Self::new()
225    }
226}
227
228impl ConnectionMetrics {
229    pub fn new() -> Self {
230        Self {
231            active_connections: AtomicUsize::new(0),
232            total_connections: AtomicU64::new(0),
233            failed_connections: AtomicU64::new(0),
234            migrated_connections: AtomicU64::new(0),
235        }
236    }
237
238    pub fn connection_opened(&self) {
239        self.active_connections.fetch_add(1, Ordering::Relaxed);
240        self.total_connections.fetch_add(1, Ordering::Relaxed);
241    }
242
243    pub fn connection_closed(&self) {
244        self.active_connections.fetch_sub(1, Ordering::Relaxed);
245    }
246
247    pub fn connection_failed(&self) {
248        self.failed_connections.fetch_add(1, Ordering::Relaxed);
249    }
250
251    pub fn connection_migrated(&self) {
252        self.migrated_connections.fetch_add(1, Ordering::Relaxed);
253    }
254
255    pub fn summary(&self) -> ConnectionMetricsSummary {
256        ConnectionMetricsSummary {
257            active_connections: self.active_connections.load(Ordering::Relaxed),
258            total_connections: self.total_connections.load(Ordering::Relaxed),
259            failed_connections: self.failed_connections.load(Ordering::Relaxed),
260            migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
261        }
262    }
263}
264
265/// Metrics summary
266#[derive(Debug, Clone)]
267pub struct MetricsSummary {
268    pub event_counts: HashMap<(tracing::Level, String), u64>,
269    pub throughput: ThroughputSummary,
270    pub latency: LatencySummary,
271    pub connections: ConnectionMetricsSummary,
272}
273
274/// Throughput metrics
275#[derive(Debug, Clone)]
276pub struct ThroughputMetrics {
277    pub bytes_sent: u64,
278    pub bytes_received: u64,
279    pub duration: Duration,
280    pub packets_sent: u64,
281    pub packets_received: u64,
282}
283
284/// Throughput summary
285#[derive(Debug, Clone)]
286pub struct ThroughputSummary {
287    pub bytes_sent: u64,
288    pub bytes_received: u64,
289    pub packets_sent: u64,
290    pub packets_received: u64,
291    pub duration: Duration,
292    pub send_rate_mbps: f64,
293    pub recv_rate_mbps: f64,
294}
295
296/// Latency metrics
297#[derive(Debug, Clone)]
298pub struct LatencyMetrics {
299    pub rtt: Duration,
300    pub min_rtt: Duration,
301    pub max_rtt: Duration,
302    pub smoothed_rtt: Duration,
303}
304
305/// Latency summary
306#[derive(Debug, Clone)]
307pub struct LatencySummary {
308    pub min_rtt: Duration,
309    pub max_rtt: Duration,
310    pub avg_rtt: Duration,
311    pub sample_count: u64,
312}
313
314/// Connection metrics summary
315#[derive(Debug, Clone)]
316pub struct ConnectionMetricsSummary {
317    pub active_connections: usize,
318    pub total_connections: u64,
319    pub failed_connections: u64,
320    pub migrated_connections: u64,
321}
322
323/// Log throughput metrics
324pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
325    let duration_secs = metrics.duration.as_secs_f64();
326    let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
327    let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
328
329    let mut fields = HashMap::new();
330    fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
331    fields.insert(
332        "bytes_received".to_string(),
333        metrics.bytes_received.to_string(),
334    );
335    fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
336    fields.insert(
337        "packets_received".to_string(),
338        metrics.packets_received.to_string(),
339    );
340    fields.insert(
341        "duration_ms".to_string(),
342        metrics.duration.as_millis().to_string(),
343    );
344    fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
345    fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
346
347    logger().log_event(LogEvent {
348        timestamp: Instant::now(),
349        level: tracing::Level::INFO,
350        target: "ant_quic::metrics::throughput".to_string(),
351        message: "throughput_metrics".to_string(),
352        fields,
353        span_id: None,
354    });
355}
356
357/// Log latency metrics
358pub fn log_latency_metrics(metrics: &LatencyMetrics) {
359    let mut fields = HashMap::new();
360    fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
361    fields.insert(
362        "min_rtt_ms".to_string(),
363        metrics.min_rtt.as_millis().to_string(),
364    );
365    fields.insert(
366        "max_rtt_ms".to_string(),
367        metrics.max_rtt.as_millis().to_string(),
368    );
369    fields.insert(
370        "smoothed_rtt_ms".to_string(),
371        metrics.smoothed_rtt.as_millis().to_string(),
372    );
373
374    logger().log_event(LogEvent {
375        timestamp: Instant::now(),
376        level: tracing::Level::INFO,
377        target: "ant_quic::metrics::latency".to_string(),
378        message: "latency_metrics".to_string(),
379        fields,
380        span_id: None,
381    });
382}