1use parking_lot::RwLock;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tracing::{info, warn};
7
8pub struct MetricsCollector {
10 pub requests: RequestMetrics,
12 pub connections: ConnectionMetrics,
14 pub performance: PerformanceMetrics,
16 pub errors: ErrorMetrics,
18 pub resources: ResourceMetrics,
20}
21
22#[derive(Default)]
24pub struct RequestMetrics {
25 pub total_requests: AtomicU64,
27 pub successful_requests: AtomicU64,
29 pub failed_requests: AtomicU64,
31 pub requests_by_method: RwLock<HashMap<String, AtomicU64>>,
33 pub requests_by_status: RwLock<HashMap<u16, AtomicU64>>,
35 pub active_requests: AtomicUsize,
37}
38
39#[derive(Default)]
41pub struct ConnectionMetrics {
42 pub total_connections: AtomicU64,
44 pub active_connections: AtomicUsize,
46 pub failed_connections: AtomicU64,
48 pub pool_size: AtomicUsize,
50 pub pool_hits: AtomicU64,
52 pub pool_misses: AtomicU64,
54}
55
56pub struct PerformanceMetrics {
58 pub request_latencies: RwLock<LatencyTracker>,
60 pub connection_latencies: RwLock<LatencyTracker>,
62 pub throughput: RwLock<ThroughputTracker>,
64 pub retry_stats: RwLock<RetryStats>,
66}
67
68#[derive(Default)]
70pub struct ErrorMetrics {
71 pub errors_by_type: RwLock<HashMap<String, AtomicU64>>,
73 pub total_errors: AtomicU64,
75 pub circuit_breaker_trips: AtomicU64,
77 pub timeout_errors: AtomicU64,
79 pub connection_errors: AtomicU64,
81}
82
83#[derive(Default)]
85pub struct ResourceMetrics {
86 pub buffer_pools: RwLock<BufferPoolStats>,
88 pub parser_cache: RwLock<ParserCacheStats>,
90 pub memory_usage: AtomicU64,
92 pub cpu_time: RwLock<CpuTimeTracker>,
94}
95
96pub struct LatencyTracker {
98 samples: Vec<Duration>,
99 last_reset: Instant,
100 max_samples: usize,
101}
102
103pub struct ThroughputTracker {
105 windows: Vec<(Instant, u64)>,
107 window_size: Duration,
108 max_windows: usize,
109}
110
111#[derive(Default)]
113pub struct RetryStats {
114 pub total_retries: AtomicU64,
116 pub successful_retries: AtomicU64,
118 pub failed_retries: AtomicU64,
120 pub retries_by_attempt: HashMap<usize, AtomicU64>,
122}
123
124#[derive(Default, Clone, Debug)]
126pub struct BufferPoolStats {
127 pub small_pool_size: usize,
128 pub medium_pool_size: usize,
129 pub large_pool_size: usize,
130 pub total_allocations: u64,
131 pub total_reuses: u64,
132}
133
134#[derive(Default, Clone, Debug)]
136pub struct ParserCacheStats {
137 pub cache_size: usize,
138 pub cache_hits: u64,
139 pub cache_misses: u64,
140 pub hit_rate: f64,
141}
142
143#[derive(Default)]
145pub struct CpuTimeTracker {
146 }
148
149impl Default for PerformanceMetrics {
150 fn default() -> Self {
151 Self {
152 request_latencies: RwLock::new(LatencyTracker::new(1000)),
153 connection_latencies: RwLock::new(LatencyTracker::new(500)),
154 throughput: RwLock::new(ThroughputTracker::new(Duration::from_secs(60), 100)),
155 retry_stats: RwLock::new(RetryStats::default()),
156 }
157 }
158}
159
160impl LatencyTracker {
161 pub fn new(max_samples: usize) -> Self {
162 Self {
163 samples: Vec::with_capacity(max_samples),
164 last_reset: Instant::now(),
165 max_samples,
166 }
167 }
168
169 pub fn record(&mut self, latency: Duration) {
170 if self.samples.len() >= self.max_samples {
171 self.samples.drain(0..self.max_samples / 2);
173 }
174 self.samples.push(latency);
175 }
176
177 pub fn percentile(&self, p: f64) -> Option<Duration> {
178 if self.samples.is_empty() {
179 return None;
180 }
181
182 let mut sorted = self.samples.clone();
183 sorted.sort();
184
185 let index = ((sorted.len() - 1) as f64 * p / 100.0).round() as usize;
186 Some(sorted[index])
187 }
188
189 pub fn average(&self) -> Option<Duration> {
190 if self.samples.is_empty() {
191 return None;
192 }
193
194 let sum: Duration = self.samples.iter().sum();
195 Some(sum / self.samples.len() as u32)
196 }
197
198 pub fn count(&self) -> usize {
199 self.samples.len()
200 }
201
202 pub fn reset(&mut self) {
203 self.samples.clear();
204 self.last_reset = Instant::now();
205 }
206}
207
208impl ThroughputTracker {
209 pub fn new(window_size: Duration, max_windows: usize) -> Self {
210 Self {
211 windows: Vec::with_capacity(max_windows),
212 window_size,
213 max_windows,
214 }
215 }
216
217 pub fn record_request(&mut self) {
218 let now = Instant::now();
219
220 self.windows
222 .retain(|(timestamp, _)| now.duration_since(*timestamp) <= self.window_size);
223
224 if let Some((_, count)) = self.windows.last_mut() {
226 *count += 1;
227 } else {
228 if self.windows.len() >= self.max_windows {
229 self.windows.remove(0);
230 }
231 self.windows.push((now, 1));
232 }
233 }
234
235 pub fn requests_per_second(&self) -> f64 {
236 if self.windows.is_empty() {
237 return 0.0;
238 }
239
240 let total_requests: u64 = self.windows.iter().map(|(_, count)| *count).sum();
241 let time_span = self
242 .windows
243 .last()
244 .unwrap()
245 .0
246 .duration_since(self.windows[0].0);
247
248 if time_span.as_secs_f64() > 0.0 {
249 total_requests as f64 / time_span.as_secs_f64()
250 } else {
251 0.0
252 }
253 }
254}
255
256impl Default for MetricsCollector {
257 fn default() -> Self {
258 Self::new()
259 }
260}
261
262impl MetricsCollector {
263 pub fn new() -> Self {
264 Self {
265 requests: RequestMetrics::default(),
266 connections: ConnectionMetrics::default(),
267 performance: PerformanceMetrics::default(),
268 errors: ErrorMetrics::default(),
269 resources: ResourceMetrics::default(),
270 }
271 }
272
273 pub fn request_start(&self, method: &str) -> RequestTracker<'_> {
275 self.requests.total_requests.fetch_add(1, Ordering::Relaxed);
276 self.requests
277 .active_requests
278 .fetch_add(1, Ordering::Relaxed);
279
280 {
282 let methods = self.requests.requests_by_method.read();
283 if let Some(counter) = methods.get(method) {
284 counter.fetch_add(1, Ordering::Relaxed);
285 } else {
286 drop(methods);
287 let mut methods = self.requests.requests_by_method.write();
288 methods
289 .entry(method.to_string())
290 .or_insert_with(|| AtomicU64::new(1));
291 }
292 }
293
294 {
296 let mut throughput = self.performance.throughput.write();
297 throughput.record_request();
298 }
299
300 RequestTracker {
301 metrics: self,
302 start_time: Instant::now(),
303 }
304 }
305
306 pub fn connection_created(&self, from_pool: bool) {
308 self.connections
309 .total_connections
310 .fetch_add(1, Ordering::Relaxed);
311 self.connections
312 .active_connections
313 .fetch_add(1, Ordering::Relaxed);
314
315 if from_pool {
316 self.connections.pool_hits.fetch_add(1, Ordering::Relaxed);
317 } else {
318 self.connections.pool_misses.fetch_add(1, Ordering::Relaxed);
319 }
320 }
321
322 pub fn connection_failed(&self) {
324 self.connections
325 .failed_connections
326 .fetch_add(1, Ordering::Relaxed);
327 self.errors
328 .connection_errors
329 .fetch_add(1, Ordering::Relaxed);
330 }
331
332 pub fn record_error(&self, error_type: &str) {
334 self.errors.total_errors.fetch_add(1, Ordering::Relaxed);
335
336 let mut errors = self.errors.errors_by_type.write();
337 errors
338 .entry(error_type.to_string())
339 .or_insert_with(|| AtomicU64::new(0))
340 .fetch_add(1, Ordering::Relaxed);
341 }
342
343 pub fn record_retry(&self, attempt: usize, success: bool) {
345 let mut stats = self.performance.retry_stats.write();
346 stats.total_retries.fetch_add(1, Ordering::Relaxed);
347
348 if success {
349 stats.successful_retries.fetch_add(1, Ordering::Relaxed);
350 } else {
351 stats.failed_retries.fetch_add(1, Ordering::Relaxed);
352 }
353
354 stats
355 .retries_by_attempt
356 .entry(attempt)
357 .or_insert_with(|| AtomicU64::new(0))
358 .fetch_add(1, Ordering::Relaxed);
359 }
360
361 pub fn update_buffer_pool_stats(&self, stats: BufferPoolStats) {
363 *self.resources.buffer_pools.write() = stats;
364 }
365
366 pub fn update_parser_cache_stats(&self, stats: ParserCacheStats) {
367 *self.resources.parser_cache.write() = stats;
368 }
369
370 pub fn snapshot(&self) -> MetricsSnapshot {
372 let request_latencies = self.performance.request_latencies.read();
373 let throughput = self.performance.throughput.read();
374
375 MetricsSnapshot {
376 total_requests: self.requests.total_requests.load(Ordering::Relaxed),
378 successful_requests: self.requests.successful_requests.load(Ordering::Relaxed),
379 failed_requests: self.requests.failed_requests.load(Ordering::Relaxed),
380 active_requests: self.requests.active_requests.load(Ordering::Relaxed),
381
382 total_connections: self.connections.total_connections.load(Ordering::Relaxed),
384 active_connections: self.connections.active_connections.load(Ordering::Relaxed),
385 pool_hits: self.connections.pool_hits.load(Ordering::Relaxed),
386 pool_misses: self.connections.pool_misses.load(Ordering::Relaxed),
387
388 avg_latency: request_latencies.average(),
390 p95_latency: request_latencies.percentile(95.0),
391 p99_latency: request_latencies.percentile(99.0),
392 requests_per_second: throughput.requests_per_second(),
393
394 total_errors: self.errors.total_errors.load(Ordering::Relaxed),
396 timeout_errors: self.errors.timeout_errors.load(Ordering::Relaxed),
397 connection_errors: self.errors.connection_errors.load(Ordering::Relaxed),
398
399 buffer_pool_stats: self.resources.buffer_pools.read().clone(),
401 parser_cache_stats: self.resources.parser_cache.read().clone(),
402 memory_usage: self.resources.memory_usage.load(Ordering::Relaxed),
403
404 timestamp: Instant::now(),
405 }
406 }
407
408 pub fn print_summary(&self) {
410 let snapshot = self.snapshot();
411
412 info!("=== Kode-Bridge Metrics Summary ===");
413 info!(
414 "Requests: {} total, {} active, {} successful, {} failed",
415 snapshot.total_requests,
416 snapshot.active_requests,
417 snapshot.successful_requests,
418 snapshot.failed_requests
419 );
420
421 if let Some(avg) = snapshot.avg_latency {
422 info!("Latency: avg={:.2}ms", avg.as_millis());
423 }
424 if let Some(p95) = snapshot.p95_latency {
425 info!("Latency P95: {:.2}ms", p95.as_millis());
426 }
427
428 info!("Throughput: {:.2} req/s", snapshot.requests_per_second);
429 info!(
430 "Connections: {} total, {} active, pool hit rate: {:.1}%",
431 snapshot.total_connections,
432 snapshot.active_connections,
433 if snapshot.pool_hits + snapshot.pool_misses > 0 {
434 snapshot.pool_hits as f64 / (snapshot.pool_hits + snapshot.pool_misses) as f64
435 * 100.0
436 } else {
437 0.0
438 }
439 );
440
441 if snapshot.total_errors > 0 {
442 warn!(
443 "Errors: {} total ({} timeout, {} connection)",
444 snapshot.total_errors, snapshot.timeout_errors, snapshot.connection_errors
445 );
446 }
447 }
448}
449
450pub struct RequestTracker<'a> {
452 metrics: &'a MetricsCollector,
453 start_time: Instant,
454}
455
456impl<'a> RequestTracker<'a> {
457 pub fn success(self, status_code: u16) {
459 self.complete(true, Some(status_code));
460 }
461
462 pub fn failure(self, error_type: &str) {
464 self.metrics.record_error(error_type);
465 self.complete(false, None);
466 }
467
468 fn complete(self, success: bool, status_code: Option<u16>) {
469 let latency = self.start_time.elapsed();
470
471 {
473 let mut latencies = self.metrics.performance.request_latencies.write();
474 latencies.record(latency);
475 }
476
477 self.metrics
479 .requests
480 .active_requests
481 .fetch_sub(1, Ordering::Relaxed);
482
483 if success {
484 self.metrics
485 .requests
486 .successful_requests
487 .fetch_add(1, Ordering::Relaxed);
488 } else {
489 self.metrics
490 .requests
491 .failed_requests
492 .fetch_add(1, Ordering::Relaxed);
493 }
494
495 if let Some(status) = status_code {
497 let mut status_map = self.metrics.requests.requests_by_status.write();
498 status_map
499 .entry(status)
500 .or_insert_with(|| AtomicU64::new(0))
501 .fetch_add(1, Ordering::Relaxed);
502 }
503 }
504}
505
506#[derive(Debug, Clone)]
508pub struct MetricsSnapshot {
509 pub total_requests: u64,
511 pub successful_requests: u64,
512 pub failed_requests: u64,
513 pub active_requests: usize,
514
515 pub total_connections: u64,
517 pub active_connections: usize,
518 pub pool_hits: u64,
519 pub pool_misses: u64,
520
521 pub avg_latency: Option<Duration>,
523 pub p95_latency: Option<Duration>,
524 pub p99_latency: Option<Duration>,
525 pub requests_per_second: f64,
526
527 pub total_errors: u64,
529 pub timeout_errors: u64,
530 pub connection_errors: u64,
531
532 pub buffer_pool_stats: BufferPoolStats,
534 pub parser_cache_stats: ParserCacheStats,
535 pub memory_usage: u64,
536
537 pub timestamp: Instant,
538}
539
540pub struct HealthChecker {
542 metrics: Arc<MetricsCollector>,
543 thresholds: HealthThresholds,
544}
545
546#[derive(Debug, Clone)]
547pub struct HealthThresholds {
548 pub max_error_rate: f64, pub max_avg_latency: Duration, pub max_p95_latency: Duration, pub min_success_rate: f64, pub max_active_connections: usize, }
554
555impl Default for HealthThresholds {
556 fn default() -> Self {
557 Self {
558 max_error_rate: 0.05, max_avg_latency: Duration::from_millis(500), max_p95_latency: Duration::from_secs(2), min_success_rate: 0.95, max_active_connections: 1000, }
564 }
565}
566
567#[derive(Debug, Clone, PartialEq)]
568pub enum HealthStatus {
569 Healthy,
570 Warning,
571 Critical,
572}
573
574pub struct HealthReport {
575 pub status: HealthStatus,
576 pub issues: Vec<String>,
577 pub snapshot: MetricsSnapshot,
578}
579
580impl HealthChecker {
581 pub fn new(metrics: Arc<MetricsCollector>) -> Self {
582 Self {
583 metrics,
584 thresholds: HealthThresholds::default(),
585 }
586 }
587
588 pub fn with_thresholds(mut self, thresholds: HealthThresholds) -> Self {
589 self.thresholds = thresholds;
590 self
591 }
592
593 pub fn check_health(&self) -> HealthReport {
594 let snapshot = self.metrics.snapshot();
595 let mut issues = Vec::new();
596 let mut status = HealthStatus::Healthy;
597
598 if snapshot.total_requests > 0 {
600 let error_rate = snapshot.failed_requests as f64 / snapshot.total_requests as f64;
601 if error_rate > self.thresholds.max_error_rate {
602 issues.push(format!(
603 "High error rate: {:.2}% (threshold: {:.2}%)",
604 error_rate * 100.0,
605 self.thresholds.max_error_rate * 100.0
606 ));
607 status = HealthStatus::Critical;
608 }
609 }
610
611 if let Some(avg_latency) = snapshot.avg_latency {
613 if avg_latency > self.thresholds.max_avg_latency {
614 issues.push(format!(
615 "High average latency: {}ms (threshold: {}ms)",
616 avg_latency.as_millis(),
617 self.thresholds.max_avg_latency.as_millis()
618 ));
619 if status == HealthStatus::Healthy {
620 status = HealthStatus::Warning;
621 }
622 }
623 }
624
625 if let Some(p95_latency) = snapshot.p95_latency {
626 if p95_latency > self.thresholds.max_p95_latency {
627 issues.push(format!(
628 "High P95 latency: {}ms (threshold: {}ms)",
629 p95_latency.as_millis(),
630 self.thresholds.max_p95_latency.as_millis()
631 ));
632 status = HealthStatus::Critical;
633 }
634 }
635
636 if snapshot.active_connections > self.thresholds.max_active_connections {
638 issues.push(format!(
639 "Too many active connections: {} (threshold: {})",
640 snapshot.active_connections, self.thresholds.max_active_connections
641 ));
642 if status == HealthStatus::Healthy {
643 status = HealthStatus::Warning;
644 }
645 }
646
647 HealthReport {
648 status,
649 issues,
650 snapshot,
651 }
652 }
653}
654
655use std::sync::OnceLock;
657
658static GLOBAL_METRICS: OnceLock<Arc<MetricsCollector>> = OnceLock::new();
659
660pub fn global_metrics() -> &'static Arc<MetricsCollector> {
662 GLOBAL_METRICS.get_or_init(|| Arc::new(MetricsCollector::new()))
663}
664
665pub fn init_metrics() -> Arc<MetricsCollector> {
667 global_metrics().clone()
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673 use std::thread;
674
675 #[test]
676 fn test_latency_tracker() {
677 let mut tracker = LatencyTracker::new(100);
678
679 tracker.record(Duration::from_millis(100));
680 tracker.record(Duration::from_millis(200));
681 tracker.record(Duration::from_millis(300));
682
683 assert_eq!(tracker.count(), 3);
684 assert_eq!(tracker.average(), Some(Duration::from_millis(200)));
685 assert_eq!(tracker.percentile(50.0), Some(Duration::from_millis(200)));
686 }
687
688 #[test]
689 fn test_throughput_tracker() {
690 let mut tracker = ThroughputTracker::new(Duration::from_secs(1), 10);
691
692 tracker.record_request();
693 tracker.record_request();
694 tracker.record_request();
695
696 let rps = tracker.requests_per_second();
698 assert!(rps >= 0.0);
699 }
700
701 #[test]
702 fn test_metrics_collector() {
703 let metrics = MetricsCollector::new();
704
705 {
706 let tracker = metrics.request_start("GET");
707 thread::sleep(Duration::from_millis(10));
708 tracker.success(200);
709 }
710
711 let snapshot = metrics.snapshot();
712 assert_eq!(snapshot.total_requests, 1);
713 assert_eq!(snapshot.successful_requests, 1);
714 assert_eq!(snapshot.active_requests, 0);
715 }
716
717 #[test]
718 fn test_health_checker() {
719 let metrics = Arc::new(MetricsCollector::new());
720 let checker = HealthChecker::new(metrics.clone());
721
722 let report = checker.check_health();
723 assert_eq!(report.status, HealthStatus::Healthy);
724 assert!(report.issues.is_empty());
725 }
726}