1use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15use super::{LogEvent, logger};
16use crate::{Duration, Instant};
17
18#[derive(Debug)]
20pub struct MetricsCollector {
21 event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
23 throughput: Arc<ThroughputTracker>,
25 latency: Arc<LatencyTracker>,
27 connections: Arc<ConnectionMetrics>,
29}
30
31impl Default for MetricsCollector {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl MetricsCollector {
38 pub fn new() -> Self {
40 Self {
41 event_counts: Arc::new(Mutex::new(HashMap::new())),
42 throughput: Arc::new(ThroughputTracker::new()),
43 latency: Arc::new(LatencyTracker::new()),
44 connections: Arc::new(ConnectionMetrics::new()),
45 }
46 }
47
48 pub fn record_event(&self, event: &LogEvent) {
50 if let Ok(mut counts) = self.event_counts.lock() {
51 let key = (event.level, event.target.clone());
52 *counts.entry(key).or_insert(0) += 1;
53 }
54 }
55
56 pub fn summary(&self) -> MetricsSummary {
58 let event_counts = self
59 .event_counts
60 .lock()
61 .map(|counts| counts.clone())
62 .unwrap_or_default();
63
64 MetricsSummary {
65 event_counts,
66 throughput: self.throughput.summary(),
67 latency: self.latency.summary(),
68 connections: self.connections.summary(),
69 }
70 }
71}
72
73#[derive(Debug)]
75pub struct ThroughputTracker {
76 bytes_sent: AtomicU64,
77 bytes_received: AtomicU64,
78 packets_sent: AtomicU64,
79 packets_received: AtomicU64,
80 start_time: Instant,
81}
82
83impl Default for ThroughputTracker {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl ThroughputTracker {
90 pub fn new() -> Self {
92 Self {
93 bytes_sent: AtomicU64::new(0),
94 bytes_received: AtomicU64::new(0),
95 packets_sent: AtomicU64::new(0),
96 packets_received: AtomicU64::new(0),
97 start_time: Instant::now(),
98 }
99 }
100
101 pub fn record_sent(&self, bytes: u64) {
103 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
104 self.packets_sent.fetch_add(1, Ordering::Relaxed);
105 }
106
107 pub fn record_received(&self, bytes: u64) {
109 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
110 self.packets_received.fetch_add(1, Ordering::Relaxed);
111 }
112
113 pub fn summary(&self) -> ThroughputSummary {
115 let duration = self.start_time.elapsed();
116 let duration_secs = duration.as_secs_f64();
117
118 let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
119 let bytes_received = self.bytes_received.load(Ordering::Relaxed);
120
121 ThroughputSummary {
122 bytes_sent,
123 bytes_received,
124 packets_sent: self.packets_sent.load(Ordering::Relaxed),
125 packets_received: self.packets_received.load(Ordering::Relaxed),
126 duration,
127 send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
128 recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
129 }
130 }
131}
132
133#[derive(Debug)]
135pub struct LatencyTracker {
136 samples: Arc<Mutex<Vec<Duration>>>,
137 min_rtt: AtomicU64, max_rtt: AtomicU64, sum_rtt: AtomicU64, count: AtomicU64,
141}
142
143impl Default for LatencyTracker {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149impl LatencyTracker {
150 pub fn new() -> Self {
152 Self {
153 samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
154 min_rtt: AtomicU64::new(u64::MAX),
155 max_rtt: AtomicU64::new(0),
156 sum_rtt: AtomicU64::new(0),
157 count: AtomicU64::new(0),
158 }
159 }
160
161 pub fn record_rtt(&self, rtt: Duration) {
163 let micros = rtt.as_micros() as u64;
164
165 let mut current_min = self.min_rtt.load(Ordering::Relaxed);
167 while micros < current_min {
168 match self.min_rtt.compare_exchange_weak(
169 current_min,
170 micros,
171 Ordering::Relaxed,
172 Ordering::Relaxed,
173 ) {
174 Ok(_) => break,
175 Err(x) => current_min = x,
176 }
177 }
178
179 let mut current_max = self.max_rtt.load(Ordering::Relaxed);
181 while micros > current_max {
182 match self.max_rtt.compare_exchange_weak(
183 current_max,
184 micros,
185 Ordering::Relaxed,
186 Ordering::Relaxed,
187 ) {
188 Ok(_) => break,
189 Err(x) => current_max = x,
190 }
191 }
192
193 self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
195 self.count.fetch_add(1, Ordering::Relaxed);
196
197 if let Ok(mut samples) = self.samples.lock() {
199 if samples.len() < 1000 {
200 samples.push(rtt);
201 }
202 }
203 }
204
205 pub fn summary(&self) -> LatencySummary {
207 let count = self.count.load(Ordering::Relaxed);
208 let min_rtt = self.min_rtt.load(Ordering::Relaxed);
209
210 LatencySummary {
211 min_rtt: if min_rtt == u64::MAX {
212 Duration::from_micros(0)
213 } else {
214 Duration::from_micros(min_rtt)
215 },
216 max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
217 avg_rtt: if count > 0 {
218 Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
219 } else {
220 Duration::from_micros(0)
221 },
222 sample_count: count,
223 }
224 }
225}
226
227#[derive(Debug)]
229pub struct ConnectionMetrics {
230 active_connections: AtomicUsize,
231 total_connections: AtomicU64,
232 failed_connections: AtomicU64,
233 migrated_connections: AtomicU64,
234}
235
236impl Default for ConnectionMetrics {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242impl ConnectionMetrics {
243 pub fn new() -> Self {
245 Self {
246 active_connections: AtomicUsize::new(0),
247 total_connections: AtomicU64::new(0),
248 failed_connections: AtomicU64::new(0),
249 migrated_connections: AtomicU64::new(0),
250 }
251 }
252
253 pub fn connection_opened(&self) {
255 self.active_connections.fetch_add(1, Ordering::Relaxed);
256 self.total_connections.fetch_add(1, Ordering::Relaxed);
257 }
258
259 pub fn connection_closed(&self) {
261 self.active_connections.fetch_sub(1, Ordering::Relaxed);
262 }
263
264 pub fn connection_failed(&self) {
266 self.failed_connections.fetch_add(1, Ordering::Relaxed);
267 }
268
269 pub fn connection_migrated(&self) {
271 self.migrated_connections.fetch_add(1, Ordering::Relaxed);
272 }
273
274 pub fn summary(&self) -> ConnectionMetricsSummary {
276 ConnectionMetricsSummary {
277 active_connections: self.active_connections.load(Ordering::Relaxed),
278 total_connections: self.total_connections.load(Ordering::Relaxed),
279 failed_connections: self.failed_connections.load(Ordering::Relaxed),
280 migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
281 }
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct MetricsSummary {
288 pub event_counts: HashMap<(tracing::Level, String), u64>,
290 pub throughput: ThroughputSummary,
292 pub latency: LatencySummary,
294 pub connections: ConnectionMetricsSummary,
296}
297
298#[derive(Debug, Clone)]
300pub struct ThroughputMetrics {
301 pub bytes_sent: u64,
303 pub bytes_received: u64,
305 pub duration: Duration,
307 pub packets_sent: u64,
309 pub packets_received: u64,
311}
312
313#[derive(Debug, Clone)]
315pub struct ThroughputSummary {
316 pub bytes_sent: u64,
318 pub bytes_received: u64,
320 pub packets_sent: u64,
322 pub packets_received: u64,
324 pub duration: Duration,
326 pub send_rate_mbps: f64,
328 pub recv_rate_mbps: f64,
330}
331
332#[derive(Debug, Clone)]
334pub struct LatencyMetrics {
335 pub rtt: Duration,
337 pub min_rtt: Duration,
339 pub max_rtt: Duration,
341 pub smoothed_rtt: Duration,
343}
344
345#[derive(Debug, Clone)]
347pub struct LatencySummary {
348 pub min_rtt: Duration,
350 pub max_rtt: Duration,
352 pub avg_rtt: Duration,
354 pub sample_count: u64,
356}
357
358#[derive(Debug, Clone)]
360pub struct ConnectionMetricsSummary {
361 pub active_connections: usize,
363 pub total_connections: u64,
365 pub failed_connections: u64,
367 pub migrated_connections: u64,
369}
370
371pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
373 let duration_secs = metrics.duration.as_secs_f64();
374 let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
375 let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
376
377 let mut fields = HashMap::new();
378 fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
379 fields.insert(
380 "bytes_received".to_string(),
381 metrics.bytes_received.to_string(),
382 );
383 fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
384 fields.insert(
385 "packets_received".to_string(),
386 metrics.packets_received.to_string(),
387 );
388 fields.insert(
389 "duration_ms".to_string(),
390 metrics.duration.as_millis().to_string(),
391 );
392 fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
393 fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
394
395 logger().log_event(LogEvent {
396 timestamp: Instant::now(),
397 level: tracing::Level::INFO,
398 target: "ant_quic::metrics::throughput".to_string(),
399 message: "throughput_metrics".to_string(),
400 fields,
401 span_id: None,
402 });
403}
404
405pub fn log_latency_metrics(metrics: &LatencyMetrics) {
407 let mut fields = HashMap::new();
408 fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
409 fields.insert(
410 "min_rtt_ms".to_string(),
411 metrics.min_rtt.as_millis().to_string(),
412 );
413 fields.insert(
414 "max_rtt_ms".to_string(),
415 metrics.max_rtt.as_millis().to_string(),
416 );
417 fields.insert(
418 "smoothed_rtt_ms".to_string(),
419 metrics.smoothed_rtt.as_millis().to_string(),
420 );
421
422 logger().log_event(LogEvent {
423 timestamp: Instant::now(),
424 level: tracing::Level::INFO,
425 target: "ant_quic::metrics::latency".to_string(),
426 message: "latency_metrics".to_string(),
427 fields,
428 span_id: None,
429 });
430}