1use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7
8use super::{LogEvent, logger};
9use crate::{Duration, Instant};
10
11pub struct MetricsCollector {
13 event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
15 throughput: Arc<ThroughputTracker>,
17 latency: Arc<LatencyTracker>,
19 connections: Arc<ConnectionMetrics>,
21}
22
23impl Default for MetricsCollector {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl MetricsCollector {
30 pub fn new() -> Self {
32 Self {
33 event_counts: Arc::new(Mutex::new(HashMap::new())),
34 throughput: Arc::new(ThroughputTracker::new()),
35 latency: Arc::new(LatencyTracker::new()),
36 connections: Arc::new(ConnectionMetrics::new()),
37 }
38 }
39
40 pub fn record_event(&self, event: &LogEvent) {
42 if let Ok(mut counts) = self.event_counts.lock() {
43 let key = (event.level, event.target.clone());
44 *counts.entry(key).or_insert(0) += 1;
45 }
46 }
47
48 pub fn summary(&self) -> MetricsSummary {
50 let event_counts = self
51 .event_counts
52 .lock()
53 .map(|counts| counts.clone())
54 .unwrap_or_default();
55
56 MetricsSummary {
57 event_counts,
58 throughput: self.throughput.summary(),
59 latency: self.latency.summary(),
60 connections: self.connections.summary(),
61 }
62 }
63}
64
65pub struct ThroughputTracker {
67 bytes_sent: AtomicU64,
68 bytes_received: AtomicU64,
69 packets_sent: AtomicU64,
70 packets_received: AtomicU64,
71 start_time: Instant,
72}
73
74impl Default for ThroughputTracker {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl ThroughputTracker {
81 pub fn new() -> Self {
82 Self {
83 bytes_sent: AtomicU64::new(0),
84 bytes_received: AtomicU64::new(0),
85 packets_sent: AtomicU64::new(0),
86 packets_received: AtomicU64::new(0),
87 start_time: Instant::now(),
88 }
89 }
90
91 pub fn record_sent(&self, bytes: u64) {
92 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
93 self.packets_sent.fetch_add(1, Ordering::Relaxed);
94 }
95
96 pub fn record_received(&self, bytes: u64) {
97 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
98 self.packets_received.fetch_add(1, Ordering::Relaxed);
99 }
100
101 pub fn summary(&self) -> ThroughputSummary {
102 let duration = self.start_time.elapsed();
103 let duration_secs = duration.as_secs_f64();
104
105 let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
106 let bytes_received = self.bytes_received.load(Ordering::Relaxed);
107
108 ThroughputSummary {
109 bytes_sent,
110 bytes_received,
111 packets_sent: self.packets_sent.load(Ordering::Relaxed),
112 packets_received: self.packets_received.load(Ordering::Relaxed),
113 duration,
114 send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
115 recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
116 }
117 }
118}
119
120pub struct LatencyTracker {
122 samples: Arc<Mutex<Vec<Duration>>>,
123 min_rtt: AtomicU64, max_rtt: AtomicU64, sum_rtt: AtomicU64, count: AtomicU64,
127}
128
129impl Default for LatencyTracker {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl LatencyTracker {
136 pub fn new() -> Self {
137 Self {
138 samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
139 min_rtt: AtomicU64::new(u64::MAX),
140 max_rtt: AtomicU64::new(0),
141 sum_rtt: AtomicU64::new(0),
142 count: AtomicU64::new(0),
143 }
144 }
145
146 pub fn record_rtt(&self, rtt: Duration) {
147 let micros = rtt.as_micros() as u64;
148
149 let mut current_min = self.min_rtt.load(Ordering::Relaxed);
151 while micros < current_min {
152 match self.min_rtt.compare_exchange_weak(
153 current_min,
154 micros,
155 Ordering::Relaxed,
156 Ordering::Relaxed,
157 ) {
158 Ok(_) => break,
159 Err(x) => current_min = x,
160 }
161 }
162
163 let mut current_max = self.max_rtt.load(Ordering::Relaxed);
165 while micros > current_max {
166 match self.max_rtt.compare_exchange_weak(
167 current_max,
168 micros,
169 Ordering::Relaxed,
170 Ordering::Relaxed,
171 ) {
172 Ok(_) => break,
173 Err(x) => current_max = x,
174 }
175 }
176
177 self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
179 self.count.fetch_add(1, Ordering::Relaxed);
180
181 if let Ok(mut samples) = self.samples.lock() {
183 if samples.len() < 1000 {
184 samples.push(rtt);
185 }
186 }
187 }
188
189 pub fn summary(&self) -> LatencySummary {
190 let count = self.count.load(Ordering::Relaxed);
191 let min_rtt = self.min_rtt.load(Ordering::Relaxed);
192
193 LatencySummary {
194 min_rtt: if min_rtt == u64::MAX {
195 Duration::from_micros(0)
196 } else {
197 Duration::from_micros(min_rtt)
198 },
199 max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
200 avg_rtt: if count > 0 {
201 Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
202 } else {
203 Duration::from_micros(0)
204 },
205 sample_count: count,
206 }
207 }
208}
209
210pub struct ConnectionMetrics {
212 active_connections: AtomicUsize,
213 total_connections: AtomicU64,
214 failed_connections: AtomicU64,
215 migrated_connections: AtomicU64,
216}
217
218impl Default for ConnectionMetrics {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224impl ConnectionMetrics {
225 pub fn new() -> Self {
226 Self {
227 active_connections: AtomicUsize::new(0),
228 total_connections: AtomicU64::new(0),
229 failed_connections: AtomicU64::new(0),
230 migrated_connections: AtomicU64::new(0),
231 }
232 }
233
234 pub fn connection_opened(&self) {
235 self.active_connections.fetch_add(1, Ordering::Relaxed);
236 self.total_connections.fetch_add(1, Ordering::Relaxed);
237 }
238
239 pub fn connection_closed(&self) {
240 self.active_connections.fetch_sub(1, Ordering::Relaxed);
241 }
242
243 pub fn connection_failed(&self) {
244 self.failed_connections.fetch_add(1, Ordering::Relaxed);
245 }
246
247 pub fn connection_migrated(&self) {
248 self.migrated_connections.fetch_add(1, Ordering::Relaxed);
249 }
250
251 pub fn summary(&self) -> ConnectionMetricsSummary {
252 ConnectionMetricsSummary {
253 active_connections: self.active_connections.load(Ordering::Relaxed),
254 total_connections: self.total_connections.load(Ordering::Relaxed),
255 failed_connections: self.failed_connections.load(Ordering::Relaxed),
256 migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
257 }
258 }
259}
260
261#[derive(Debug, Clone)]
263pub struct MetricsSummary {
264 pub event_counts: HashMap<(tracing::Level, String), u64>,
265 pub throughput: ThroughputSummary,
266 pub latency: LatencySummary,
267 pub connections: ConnectionMetricsSummary,
268}
269
270#[derive(Debug, Clone)]
272pub struct ThroughputMetrics {
273 pub bytes_sent: u64,
274 pub bytes_received: u64,
275 pub duration: Duration,
276 pub packets_sent: u64,
277 pub packets_received: u64,
278}
279
280#[derive(Debug, Clone)]
282pub struct ThroughputSummary {
283 pub bytes_sent: u64,
284 pub bytes_received: u64,
285 pub packets_sent: u64,
286 pub packets_received: u64,
287 pub duration: Duration,
288 pub send_rate_mbps: f64,
289 pub recv_rate_mbps: f64,
290}
291
292#[derive(Debug, Clone)]
294pub struct LatencyMetrics {
295 pub rtt: Duration,
296 pub min_rtt: Duration,
297 pub max_rtt: Duration,
298 pub smoothed_rtt: Duration,
299}
300
301#[derive(Debug, Clone)]
303pub struct LatencySummary {
304 pub min_rtt: Duration,
305 pub max_rtt: Duration,
306 pub avg_rtt: Duration,
307 pub sample_count: u64,
308}
309
310#[derive(Debug, Clone)]
312pub struct ConnectionMetricsSummary {
313 pub active_connections: usize,
314 pub total_connections: u64,
315 pub failed_connections: u64,
316 pub migrated_connections: u64,
317}
318
319pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
321 let duration_secs = metrics.duration.as_secs_f64();
322 let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
323 let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
324
325 let mut fields = HashMap::new();
326 fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
327 fields.insert(
328 "bytes_received".to_string(),
329 metrics.bytes_received.to_string(),
330 );
331 fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
332 fields.insert(
333 "packets_received".to_string(),
334 metrics.packets_received.to_string(),
335 );
336 fields.insert(
337 "duration_ms".to_string(),
338 metrics.duration.as_millis().to_string(),
339 );
340 fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
341 fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
342
343 logger().log_event(LogEvent {
344 timestamp: Instant::now(),
345 level: tracing::Level::INFO,
346 target: "ant_quic::metrics::throughput".to_string(),
347 message: "throughput_metrics".to_string(),
348 fields,
349 span_id: None,
350 });
351}
352
353pub fn log_latency_metrics(metrics: &LatencyMetrics) {
355 let mut fields = HashMap::new();
356 fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
357 fields.insert(
358 "min_rtt_ms".to_string(),
359 metrics.min_rtt.as_millis().to_string(),
360 );
361 fields.insert(
362 "max_rtt_ms".to_string(),
363 metrics.max_rtt.as_millis().to_string(),
364 );
365 fields.insert(
366 "smoothed_rtt_ms".to_string(),
367 metrics.smoothed_rtt.as_millis().to_string(),
368 );
369
370 logger().log_event(LogEvent {
371 timestamp: Instant::now(),
372 level: tracing::Level::INFO,
373 target: "ant_quic::metrics::latency".to_string(),
374 message: "latency_metrics".to_string(),
375 fields,
376 span_id: None,
377 });
378}