ant_quic/logging/
metrics.rs

1/// Performance metrics collection and logging
2///
3/// Tracks and logs performance metrics for monitoring and optimization
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7
8use super::{LogEvent, logger};
9use crate::{Duration, Instant};
10
11/// Metrics collector for performance tracking
12pub struct MetricsCollector {
13    /// Event counts by level and component
14    event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
15    /// Throughput metrics
16    throughput: Arc<ThroughputTracker>,
17    /// Latency metrics
18    latency: Arc<LatencyTracker>,
19    /// Connection metrics
20    connections: Arc<ConnectionMetrics>,
21}
22
23impl Default for MetricsCollector {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl MetricsCollector {
30    /// Create a new metrics collector
31    pub fn new() -> Self {
32        Self {
33            event_counts: Arc::new(Mutex::new(HashMap::new())),
34            throughput: Arc::new(ThroughputTracker::new()),
35            latency: Arc::new(LatencyTracker::new()),
36            connections: Arc::new(ConnectionMetrics::new()),
37        }
38    }
39
40    /// Record a log event for metrics
41    pub fn record_event(&self, event: &LogEvent) {
42        if let Ok(mut counts) = self.event_counts.lock() {
43            let key = (event.level, event.target.clone());
44            *counts.entry(key).or_insert(0) += 1;
45        }
46    }
47
48    /// Get a summary of collected metrics
49    pub fn summary(&self) -> MetricsSummary {
50        let event_counts = self
51            .event_counts
52            .lock()
53            .map(|counts| counts.clone())
54            .unwrap_or_default();
55
56        MetricsSummary {
57            event_counts,
58            throughput: self.throughput.summary(),
59            latency: self.latency.summary(),
60            connections: self.connections.summary(),
61        }
62    }
63}
64
65/// Throughput tracking
66pub struct ThroughputTracker {
67    bytes_sent: AtomicU64,
68    bytes_received: AtomicU64,
69    packets_sent: AtomicU64,
70    packets_received: AtomicU64,
71    start_time: Instant,
72}
73
74impl Default for ThroughputTracker {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl ThroughputTracker {
81    pub fn new() -> Self {
82        Self {
83            bytes_sent: AtomicU64::new(0),
84            bytes_received: AtomicU64::new(0),
85            packets_sent: AtomicU64::new(0),
86            packets_received: AtomicU64::new(0),
87            start_time: Instant::now(),
88        }
89    }
90
91    pub fn record_sent(&self, bytes: u64) {
92        self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
93        self.packets_sent.fetch_add(1, Ordering::Relaxed);
94    }
95
96    pub fn record_received(&self, bytes: u64) {
97        self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
98        self.packets_received.fetch_add(1, Ordering::Relaxed);
99    }
100
101    pub fn summary(&self) -> ThroughputSummary {
102        let duration = self.start_time.elapsed();
103        let duration_secs = duration.as_secs_f64();
104
105        let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
106        let bytes_received = self.bytes_received.load(Ordering::Relaxed);
107
108        ThroughputSummary {
109            bytes_sent,
110            bytes_received,
111            packets_sent: self.packets_sent.load(Ordering::Relaxed),
112            packets_received: self.packets_received.load(Ordering::Relaxed),
113            duration,
114            send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
115            recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
116        }
117    }
118}
119
120/// Latency tracking
121pub struct LatencyTracker {
122    samples: Arc<Mutex<Vec<Duration>>>,
123    min_rtt: AtomicU64, // microseconds
124    max_rtt: AtomicU64, // microseconds
125    sum_rtt: AtomicU64, // microseconds
126    count: AtomicU64,
127}
128
129impl Default for LatencyTracker {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl LatencyTracker {
136    pub fn new() -> Self {
137        Self {
138            samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
139            min_rtt: AtomicU64::new(u64::MAX),
140            max_rtt: AtomicU64::new(0),
141            sum_rtt: AtomicU64::new(0),
142            count: AtomicU64::new(0),
143        }
144    }
145
146    pub fn record_rtt(&self, rtt: Duration) {
147        let micros = rtt.as_micros() as u64;
148
149        // Update min
150        let mut current_min = self.min_rtt.load(Ordering::Relaxed);
151        while micros < current_min {
152            match self.min_rtt.compare_exchange_weak(
153                current_min,
154                micros,
155                Ordering::Relaxed,
156                Ordering::Relaxed,
157            ) {
158                Ok(_) => break,
159                Err(x) => current_min = x,
160            }
161        }
162
163        // Update max
164        let mut current_max = self.max_rtt.load(Ordering::Relaxed);
165        while micros > current_max {
166            match self.max_rtt.compare_exchange_weak(
167                current_max,
168                micros,
169                Ordering::Relaxed,
170                Ordering::Relaxed,
171            ) {
172                Ok(_) => break,
173                Err(x) => current_max = x,
174            }
175        }
176
177        // Update sum and count
178        self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
179        self.count.fetch_add(1, Ordering::Relaxed);
180
181        // Store sample
182        if let Ok(mut samples) = self.samples.lock() {
183            if samples.len() < 1000 {
184                samples.push(rtt);
185            }
186        }
187    }
188
189    pub fn summary(&self) -> LatencySummary {
190        let count = self.count.load(Ordering::Relaxed);
191        let min_rtt = self.min_rtt.load(Ordering::Relaxed);
192
193        LatencySummary {
194            min_rtt: if min_rtt == u64::MAX {
195                Duration::from_micros(0)
196            } else {
197                Duration::from_micros(min_rtt)
198            },
199            max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
200            avg_rtt: if count > 0 {
201                Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
202            } else {
203                Duration::from_micros(0)
204            },
205            sample_count: count,
206        }
207    }
208}
209
210/// Connection metrics
211pub struct ConnectionMetrics {
212    active_connections: AtomicUsize,
213    total_connections: AtomicU64,
214    failed_connections: AtomicU64,
215    migrated_connections: AtomicU64,
216}
217
218impl Default for ConnectionMetrics {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224impl ConnectionMetrics {
225    pub fn new() -> Self {
226        Self {
227            active_connections: AtomicUsize::new(0),
228            total_connections: AtomicU64::new(0),
229            failed_connections: AtomicU64::new(0),
230            migrated_connections: AtomicU64::new(0),
231        }
232    }
233
234    pub fn connection_opened(&self) {
235        self.active_connections.fetch_add(1, Ordering::Relaxed);
236        self.total_connections.fetch_add(1, Ordering::Relaxed);
237    }
238
239    pub fn connection_closed(&self) {
240        self.active_connections.fetch_sub(1, Ordering::Relaxed);
241    }
242
243    pub fn connection_failed(&self) {
244        self.failed_connections.fetch_add(1, Ordering::Relaxed);
245    }
246
247    pub fn connection_migrated(&self) {
248        self.migrated_connections.fetch_add(1, Ordering::Relaxed);
249    }
250
251    pub fn summary(&self) -> ConnectionMetricsSummary {
252        ConnectionMetricsSummary {
253            active_connections: self.active_connections.load(Ordering::Relaxed),
254            total_connections: self.total_connections.load(Ordering::Relaxed),
255            failed_connections: self.failed_connections.load(Ordering::Relaxed),
256            migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
257        }
258    }
259}
260
261/// Metrics summary
262#[derive(Debug, Clone)]
263pub struct MetricsSummary {
264    pub event_counts: HashMap<(tracing::Level, String), u64>,
265    pub throughput: ThroughputSummary,
266    pub latency: LatencySummary,
267    pub connections: ConnectionMetricsSummary,
268}
269
270/// Throughput metrics
271#[derive(Debug, Clone)]
272pub struct ThroughputMetrics {
273    pub bytes_sent: u64,
274    pub bytes_received: u64,
275    pub duration: Duration,
276    pub packets_sent: u64,
277    pub packets_received: u64,
278}
279
280/// Throughput summary
281#[derive(Debug, Clone)]
282pub struct ThroughputSummary {
283    pub bytes_sent: u64,
284    pub bytes_received: u64,
285    pub packets_sent: u64,
286    pub packets_received: u64,
287    pub duration: Duration,
288    pub send_rate_mbps: f64,
289    pub recv_rate_mbps: f64,
290}
291
292/// Latency metrics
293#[derive(Debug, Clone)]
294pub struct LatencyMetrics {
295    pub rtt: Duration,
296    pub min_rtt: Duration,
297    pub max_rtt: Duration,
298    pub smoothed_rtt: Duration,
299}
300
301/// Latency summary
302#[derive(Debug, Clone)]
303pub struct LatencySummary {
304    pub min_rtt: Duration,
305    pub max_rtt: Duration,
306    pub avg_rtt: Duration,
307    pub sample_count: u64,
308}
309
310/// Connection metrics summary
311#[derive(Debug, Clone)]
312pub struct ConnectionMetricsSummary {
313    pub active_connections: usize,
314    pub total_connections: u64,
315    pub failed_connections: u64,
316    pub migrated_connections: u64,
317}
318
319/// Log throughput metrics
320pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
321    let duration_secs = metrics.duration.as_secs_f64();
322    let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
323    let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
324
325    let mut fields = HashMap::new();
326    fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
327    fields.insert(
328        "bytes_received".to_string(),
329        metrics.bytes_received.to_string(),
330    );
331    fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
332    fields.insert(
333        "packets_received".to_string(),
334        metrics.packets_received.to_string(),
335    );
336    fields.insert(
337        "duration_ms".to_string(),
338        metrics.duration.as_millis().to_string(),
339    );
340    fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
341    fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
342
343    logger().log_event(LogEvent {
344        timestamp: Instant::now(),
345        level: tracing::Level::INFO,
346        target: "ant_quic::metrics::throughput".to_string(),
347        message: "throughput_metrics".to_string(),
348        fields,
349        span_id: None,
350    });
351}
352
353/// Log latency metrics
354pub fn log_latency_metrics(metrics: &LatencyMetrics) {
355    let mut fields = HashMap::new();
356    fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
357    fields.insert(
358        "min_rtt_ms".to_string(),
359        metrics.min_rtt.as_millis().to_string(),
360    );
361    fields.insert(
362        "max_rtt_ms".to_string(),
363        metrics.max_rtt.as_millis().to_string(),
364    );
365    fields.insert(
366        "smoothed_rtt_ms".to_string(),
367        metrics.smoothed_rtt.as_millis().to_string(),
368    );
369
370    logger().log_event(LogEvent {
371        timestamp: Instant::now(),
372        level: tracing::Level::INFO,
373        target: "ant_quic::metrics::latency".to_string(),
374        message: "latency_metrics".to_string(),
375        fields,
376        span_id: None,
377    });
378}