1use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex};
7
8use super::{LogEvent, logger};
9use crate::{Duration, Instant};
10
11#[derive(Debug)]
13pub struct MetricsCollector {
14 event_counts: Arc<Mutex<HashMap<(tracing::Level, String), u64>>>,
16 throughput: Arc<ThroughputTracker>,
18 latency: Arc<LatencyTracker>,
20 connections: Arc<ConnectionMetrics>,
22}
23
24impl Default for MetricsCollector {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl MetricsCollector {
31 pub fn new() -> Self {
33 Self {
34 event_counts: Arc::new(Mutex::new(HashMap::new())),
35 throughput: Arc::new(ThroughputTracker::new()),
36 latency: Arc::new(LatencyTracker::new()),
37 connections: Arc::new(ConnectionMetrics::new()),
38 }
39 }
40
41 pub fn record_event(&self, event: &LogEvent) {
43 if let Ok(mut counts) = self.event_counts.lock() {
44 let key = (event.level, event.target.clone());
45 *counts.entry(key).or_insert(0) += 1;
46 }
47 }
48
49 pub fn summary(&self) -> MetricsSummary {
51 let event_counts = self
52 .event_counts
53 .lock()
54 .map(|counts| counts.clone())
55 .unwrap_or_default();
56
57 MetricsSummary {
58 event_counts,
59 throughput: self.throughput.summary(),
60 latency: self.latency.summary(),
61 connections: self.connections.summary(),
62 }
63 }
64}
65
66#[derive(Debug)]
68pub struct ThroughputTracker {
69 bytes_sent: AtomicU64,
70 bytes_received: AtomicU64,
71 packets_sent: AtomicU64,
72 packets_received: AtomicU64,
73 start_time: Instant,
74}
75
76impl Default for ThroughputTracker {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl ThroughputTracker {
83 pub fn new() -> Self {
84 Self {
85 bytes_sent: AtomicU64::new(0),
86 bytes_received: AtomicU64::new(0),
87 packets_sent: AtomicU64::new(0),
88 packets_received: AtomicU64::new(0),
89 start_time: Instant::now(),
90 }
91 }
92
93 pub fn record_sent(&self, bytes: u64) {
94 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
95 self.packets_sent.fetch_add(1, Ordering::Relaxed);
96 }
97
98 pub fn record_received(&self, bytes: u64) {
99 self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
100 self.packets_received.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub fn summary(&self) -> ThroughputSummary {
104 let duration = self.start_time.elapsed();
105 let duration_secs = duration.as_secs_f64();
106
107 let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
108 let bytes_received = self.bytes_received.load(Ordering::Relaxed);
109
110 ThroughputSummary {
111 bytes_sent,
112 bytes_received,
113 packets_sent: self.packets_sent.load(Ordering::Relaxed),
114 packets_received: self.packets_received.load(Ordering::Relaxed),
115 duration,
116 send_rate_mbps: (bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0),
117 recv_rate_mbps: (bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0),
118 }
119 }
120}
121
122#[derive(Debug)]
124pub struct LatencyTracker {
125 samples: Arc<Mutex<Vec<Duration>>>,
126 min_rtt: AtomicU64, max_rtt: AtomicU64, sum_rtt: AtomicU64, count: AtomicU64,
130}
131
132impl Default for LatencyTracker {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138impl LatencyTracker {
139 pub fn new() -> Self {
140 Self {
141 samples: Arc::new(Mutex::new(Vec::with_capacity(1000))),
142 min_rtt: AtomicU64::new(u64::MAX),
143 max_rtt: AtomicU64::new(0),
144 sum_rtt: AtomicU64::new(0),
145 count: AtomicU64::new(0),
146 }
147 }
148
149 pub fn record_rtt(&self, rtt: Duration) {
150 let micros = rtt.as_micros() as u64;
151
152 let mut current_min = self.min_rtt.load(Ordering::Relaxed);
154 while micros < current_min {
155 match self.min_rtt.compare_exchange_weak(
156 current_min,
157 micros,
158 Ordering::Relaxed,
159 Ordering::Relaxed,
160 ) {
161 Ok(_) => break,
162 Err(x) => current_min = x,
163 }
164 }
165
166 let mut current_max = self.max_rtt.load(Ordering::Relaxed);
168 while micros > current_max {
169 match self.max_rtt.compare_exchange_weak(
170 current_max,
171 micros,
172 Ordering::Relaxed,
173 Ordering::Relaxed,
174 ) {
175 Ok(_) => break,
176 Err(x) => current_max = x,
177 }
178 }
179
180 self.sum_rtt.fetch_add(micros, Ordering::Relaxed);
182 self.count.fetch_add(1, Ordering::Relaxed);
183
184 if let Ok(mut samples) = self.samples.lock() {
186 if samples.len() < 1000 {
187 samples.push(rtt);
188 }
189 }
190 }
191
192 pub fn summary(&self) -> LatencySummary {
193 let count = self.count.load(Ordering::Relaxed);
194 let min_rtt = self.min_rtt.load(Ordering::Relaxed);
195
196 LatencySummary {
197 min_rtt: if min_rtt == u64::MAX {
198 Duration::from_micros(0)
199 } else {
200 Duration::from_micros(min_rtt)
201 },
202 max_rtt: Duration::from_micros(self.max_rtt.load(Ordering::Relaxed)),
203 avg_rtt: if count > 0 {
204 Duration::from_micros(self.sum_rtt.load(Ordering::Relaxed) / count)
205 } else {
206 Duration::from_micros(0)
207 },
208 sample_count: count,
209 }
210 }
211}
212
213#[derive(Debug)]
215pub struct ConnectionMetrics {
216 active_connections: AtomicUsize,
217 total_connections: AtomicU64,
218 failed_connections: AtomicU64,
219 migrated_connections: AtomicU64,
220}
221
222impl Default for ConnectionMetrics {
223 fn default() -> Self {
224 Self::new()
225 }
226}
227
228impl ConnectionMetrics {
229 pub fn new() -> Self {
230 Self {
231 active_connections: AtomicUsize::new(0),
232 total_connections: AtomicU64::new(0),
233 failed_connections: AtomicU64::new(0),
234 migrated_connections: AtomicU64::new(0),
235 }
236 }
237
238 pub fn connection_opened(&self) {
239 self.active_connections.fetch_add(1, Ordering::Relaxed);
240 self.total_connections.fetch_add(1, Ordering::Relaxed);
241 }
242
243 pub fn connection_closed(&self) {
244 self.active_connections.fetch_sub(1, Ordering::Relaxed);
245 }
246
247 pub fn connection_failed(&self) {
248 self.failed_connections.fetch_add(1, Ordering::Relaxed);
249 }
250
251 pub fn connection_migrated(&self) {
252 self.migrated_connections.fetch_add(1, Ordering::Relaxed);
253 }
254
255 pub fn summary(&self) -> ConnectionMetricsSummary {
256 ConnectionMetricsSummary {
257 active_connections: self.active_connections.load(Ordering::Relaxed),
258 total_connections: self.total_connections.load(Ordering::Relaxed),
259 failed_connections: self.failed_connections.load(Ordering::Relaxed),
260 migrated_connections: self.migrated_connections.load(Ordering::Relaxed),
261 }
262 }
263}
264
265#[derive(Debug, Clone)]
267pub struct MetricsSummary {
268 pub event_counts: HashMap<(tracing::Level, String), u64>,
269 pub throughput: ThroughputSummary,
270 pub latency: LatencySummary,
271 pub connections: ConnectionMetricsSummary,
272}
273
274#[derive(Debug, Clone)]
276pub struct ThroughputMetrics {
277 pub bytes_sent: u64,
278 pub bytes_received: u64,
279 pub duration: Duration,
280 pub packets_sent: u64,
281 pub packets_received: u64,
282}
283
284#[derive(Debug, Clone)]
286pub struct ThroughputSummary {
287 pub bytes_sent: u64,
288 pub bytes_received: u64,
289 pub packets_sent: u64,
290 pub packets_received: u64,
291 pub duration: Duration,
292 pub send_rate_mbps: f64,
293 pub recv_rate_mbps: f64,
294}
295
296#[derive(Debug, Clone)]
298pub struct LatencyMetrics {
299 pub rtt: Duration,
300 pub min_rtt: Duration,
301 pub max_rtt: Duration,
302 pub smoothed_rtt: Duration,
303}
304
305#[derive(Debug, Clone)]
307pub struct LatencySummary {
308 pub min_rtt: Duration,
309 pub max_rtt: Duration,
310 pub avg_rtt: Duration,
311 pub sample_count: u64,
312}
313
314#[derive(Debug, Clone)]
316pub struct ConnectionMetricsSummary {
317 pub active_connections: usize,
318 pub total_connections: u64,
319 pub failed_connections: u64,
320 pub migrated_connections: u64,
321}
322
323pub fn log_throughput_metrics(metrics: &ThroughputMetrics) {
325 let duration_secs = metrics.duration.as_secs_f64();
326 let send_rate_mbps = (metrics.bytes_sent as f64 * 8.0) / (duration_secs * 1_000_000.0);
327 let recv_rate_mbps = (metrics.bytes_received as f64 * 8.0) / (duration_secs * 1_000_000.0);
328
329 let mut fields = HashMap::new();
330 fields.insert("bytes_sent".to_string(), metrics.bytes_sent.to_string());
331 fields.insert(
332 "bytes_received".to_string(),
333 metrics.bytes_received.to_string(),
334 );
335 fields.insert("packets_sent".to_string(), metrics.packets_sent.to_string());
336 fields.insert(
337 "packets_received".to_string(),
338 metrics.packets_received.to_string(),
339 );
340 fields.insert(
341 "duration_ms".to_string(),
342 metrics.duration.as_millis().to_string(),
343 );
344 fields.insert("send_rate_mbps".to_string(), format!("{send_rate_mbps:.2}"));
345 fields.insert("recv_rate_mbps".to_string(), format!("{recv_rate_mbps:.2}"));
346
347 logger().log_event(LogEvent {
348 timestamp: Instant::now(),
349 level: tracing::Level::INFO,
350 target: "ant_quic::metrics::throughput".to_string(),
351 message: "throughput_metrics".to_string(),
352 fields,
353 span_id: None,
354 });
355}
356
357pub fn log_latency_metrics(metrics: &LatencyMetrics) {
359 let mut fields = HashMap::new();
360 fields.insert("rtt_ms".to_string(), metrics.rtt.as_millis().to_string());
361 fields.insert(
362 "min_rtt_ms".to_string(),
363 metrics.min_rtt.as_millis().to_string(),
364 );
365 fields.insert(
366 "max_rtt_ms".to_string(),
367 metrics.max_rtt.as_millis().to_string(),
368 );
369 fields.insert(
370 "smoothed_rtt_ms".to_string(),
371 metrics.smoothed_rtt.as_millis().to_string(),
372 );
373
374 logger().log_event(LogEvent {
375 timestamp: Instant::now(),
376 level: tracing::Level::INFO,
377 target: "ant_quic::metrics::latency".to_string(),
378 message: "latency_metrics".to_string(),
379 fields,
380 span_id: None,
381 });
382}