1use parking_lot::RwLock;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tracing::{info, warn};
7
8pub struct MetricsCollector {
10 pub requests: RequestMetrics,
12 pub connections: ConnectionMetrics,
14 pub performance: PerformanceMetrics,
16 pub errors: ErrorMetrics,
18 pub resources: ResourceMetrics,
20}
21
22#[derive(Default)]
24pub struct RequestMetrics {
25 pub total_requests: AtomicU64,
27 pub successful_requests: AtomicU64,
29 pub failed_requests: AtomicU64,
31 pub requests_by_method: RwLock<HashMap<String, AtomicU64>>,
33 pub requests_by_status: RwLock<HashMap<u16, AtomicU64>>,
35 pub active_requests: AtomicUsize,
37}
38
39#[derive(Default)]
41pub struct ConnectionMetrics {
42 pub total_connections: AtomicU64,
44 pub active_connections: AtomicUsize,
46 pub failed_connections: AtomicU64,
48 pub pool_size: AtomicUsize,
50 pub pool_hits: AtomicU64,
52 pub pool_misses: AtomicU64,
54}
55
56pub struct PerformanceMetrics {
58 pub request_latencies: RwLock<LatencyTracker>,
60 pub connection_latencies: RwLock<LatencyTracker>,
62 pub throughput: RwLock<ThroughputTracker>,
64 pub retry_stats: RwLock<RetryStats>,
66}
67
68#[derive(Default)]
70pub struct ErrorMetrics {
71 pub errors_by_type: RwLock<HashMap<String, AtomicU64>>,
73 pub total_errors: AtomicU64,
75 pub circuit_breaker_trips: AtomicU64,
77 pub timeout_errors: AtomicU64,
79 pub connection_errors: AtomicU64,
81}
82
83#[derive(Default)]
85pub struct ResourceMetrics {
86 pub buffer_pools: RwLock<BufferPoolStats>,
88 pub parser_cache: RwLock<ParserCacheStats>,
90 pub memory_usage: AtomicU64,
92 pub cpu_time: RwLock<CpuTimeTracker>,
94}
95
96pub struct LatencyTracker {
98 samples: Vec<Duration>,
99 last_reset: Instant,
100 max_samples: usize,
101}
102
103pub struct ThroughputTracker {
105 windows: Vec<(Instant, u64)>,
107 window_size: Duration,
108 max_windows: usize,
109}
110
111#[derive(Default)]
113pub struct RetryStats {
114 pub total_retries: AtomicU64,
116 pub successful_retries: AtomicU64,
118 pub failed_retries: AtomicU64,
120 pub retries_by_attempt: HashMap<usize, AtomicU64>,
122}
123
124#[derive(Default, Clone, Debug)]
126pub struct BufferPoolStats {
127 pub small_pool_size: usize,
128 pub medium_pool_size: usize,
129 pub large_pool_size: usize,
130 pub total_allocations: u64,
131 pub total_reuses: u64,
132}
133
134#[derive(Default, Clone, Debug)]
136pub struct ParserCacheStats {
137 pub cache_size: usize,
138 pub cache_hits: u64,
139 pub cache_misses: u64,
140 pub hit_rate: f64,
141}
142
143#[derive(Default)]
145pub struct CpuTimeTracker {
146 }
148
149impl Default for PerformanceMetrics {
150 fn default() -> Self {
151 Self {
152 request_latencies: RwLock::new(LatencyTracker::new(1000)),
153 connection_latencies: RwLock::new(LatencyTracker::new(500)),
154 throughput: RwLock::new(ThroughputTracker::new(Duration::from_secs(60), 100)),
155 retry_stats: RwLock::new(RetryStats::default()),
156 }
157 }
158}
159
160impl LatencyTracker {
161 pub fn new(max_samples: usize) -> Self {
162 Self {
163 samples: Vec::with_capacity(max_samples),
164 last_reset: Instant::now(),
165 max_samples,
166 }
167 }
168
169 pub fn record(&mut self, latency: Duration) {
170 if self.samples.len() >= self.max_samples {
171 self.samples.drain(0..self.max_samples / 2);
173 }
174 self.samples.push(latency);
175 }
176
177 pub fn percentile(&self, p: f64) -> Option<Duration> {
178 if self.samples.is_empty() {
179 return None;
180 }
181
182 let mut sorted = self.samples.clone();
183 sorted.sort();
184
185 let index = ((sorted.len() - 1) as f64 * p / 100.0).round() as usize;
186 Some(sorted[index])
187 }
188
189 pub fn average(&self) -> Option<Duration> {
190 if self.samples.is_empty() {
191 return None;
192 }
193
194 let sum: Duration = self.samples.iter().sum();
195 Some(sum / self.samples.len() as u32)
196 }
197
198 pub fn count(&self) -> usize {
199 self.samples.len()
200 }
201
202 pub fn reset(&mut self) {
203 self.samples.clear();
204 self.last_reset = Instant::now();
205 }
206}
207
208impl ThroughputTracker {
209 pub fn new(window_size: Duration, max_windows: usize) -> Self {
210 Self {
211 windows: Vec::with_capacity(max_windows),
212 window_size,
213 max_windows,
214 }
215 }
216
217 pub fn record_request(&mut self) {
218 let now = Instant::now();
219
220 self.windows
222 .retain(|(timestamp, _)| now.duration_since(*timestamp) <= self.window_size);
223
224 if let Some((_, count)) = self.windows.last_mut() {
226 *count += 1;
227 } else {
228 if self.windows.len() >= self.max_windows {
229 self.windows.remove(0);
230 }
231 self.windows.push((now, 1));
232 }
233 }
234
235 pub fn requests_per_second(&self) -> f64 {
236 if self.windows.is_empty() {
237 return 0.0;
238 }
239
240 let total_requests: u64 = self.windows.iter().map(|(_, count)| *count).sum();
241 let last_timestamp = match self.windows.last() {
242 Some((timestamp, _)) => *timestamp,
243 None => return 0.0,
244 };
245
246 let time_span = last_timestamp.duration_since(self.windows[0].0);
247
248 if time_span.as_secs_f64() > 0.0 {
249 total_requests as f64 / time_span.as_secs_f64()
250 } else {
251 0.0
252 }
253 }
254}
255
256impl Default for MetricsCollector {
257 fn default() -> Self {
258 Self::new()
259 }
260}
261
262impl MetricsCollector {
263 pub fn new() -> Self {
264 Self {
265 requests: RequestMetrics::default(),
266 connections: ConnectionMetrics::default(),
267 performance: PerformanceMetrics::default(),
268 errors: ErrorMetrics::default(),
269 resources: ResourceMetrics::default(),
270 }
271 }
272
273 pub fn request_start(&self, method: &str) -> RequestTracker<'_> {
275 self.requests.total_requests.fetch_add(1, Ordering::Relaxed);
276 self.requests
277 .active_requests
278 .fetch_add(1, Ordering::Relaxed);
279
280 {
282 let methods = self.requests.requests_by_method.read();
283 if let Some(counter) = methods.get(method) {
284 counter.fetch_add(1, Ordering::Relaxed);
285 } else {
286 drop(methods);
287 let mut methods = self.requests.requests_by_method.write();
288 methods
289 .entry(method.to_string())
290 .or_insert_with(|| AtomicU64::new(1));
291 }
292 }
293
294 {
296 let mut throughput = self.performance.throughput.write();
297 throughput.record_request();
298 }
299
300 RequestTracker {
301 metrics: self,
302 start_time: Instant::now(),
303 }
304 }
305
306 pub fn connection_created(&self, from_pool: bool) {
308 self.connections
309 .total_connections
310 .fetch_add(1, Ordering::Relaxed);
311 self.connections
312 .active_connections
313 .fetch_add(1, Ordering::Relaxed);
314
315 if from_pool {
316 self.connections.pool_hits.fetch_add(1, Ordering::Relaxed);
317 } else {
318 self.connections.pool_misses.fetch_add(1, Ordering::Relaxed);
319 }
320 }
321
322 pub fn connection_failed(&self) {
324 self.connections
325 .failed_connections
326 .fetch_add(1, Ordering::Relaxed);
327 self.errors
328 .connection_errors
329 .fetch_add(1, Ordering::Relaxed);
330 }
331
332 pub fn record_error(&self, error_type: &str) {
334 self.errors.total_errors.fetch_add(1, Ordering::Relaxed);
335
336 let mut errors = self.errors.errors_by_type.write();
337 errors
338 .entry(error_type.to_string())
339 .or_insert_with(|| AtomicU64::new(0))
340 .fetch_add(1, Ordering::Relaxed);
341 }
342
343 pub fn record_retry(&self, attempt: usize, success: bool) {
345 let mut stats = self.performance.retry_stats.write();
346 stats.total_retries.fetch_add(1, Ordering::Relaxed);
347
348 if success {
349 stats.successful_retries.fetch_add(1, Ordering::Relaxed);
350 } else {
351 stats.failed_retries.fetch_add(1, Ordering::Relaxed);
352 }
353
354 stats
355 .retries_by_attempt
356 .entry(attempt)
357 .or_insert_with(|| AtomicU64::new(0))
358 .fetch_add(1, Ordering::Relaxed);
359 }
360
361 pub fn update_buffer_pool_stats(&self, stats: BufferPoolStats) {
363 *self.resources.buffer_pools.write() = stats;
364 }
365
366 pub fn update_parser_cache_stats(&self, stats: ParserCacheStats) {
367 *self.resources.parser_cache.write() = stats;
368 }
369
370 pub fn snapshot(&self) -> MetricsSnapshot {
372 let request_latencies = self.performance.request_latencies.read();
373 let throughput = self.performance.throughput.read();
374
375 MetricsSnapshot {
376 total_requests: self.requests.total_requests.load(Ordering::Relaxed),
378 successful_requests: self.requests.successful_requests.load(Ordering::Relaxed),
379 failed_requests: self.requests.failed_requests.load(Ordering::Relaxed),
380 active_requests: self.requests.active_requests.load(Ordering::Relaxed),
381
382 total_connections: self.connections.total_connections.load(Ordering::Relaxed),
384 active_connections: self.connections.active_connections.load(Ordering::Relaxed),
385 pool_hits: self.connections.pool_hits.load(Ordering::Relaxed),
386 pool_misses: self.connections.pool_misses.load(Ordering::Relaxed),
387
388 avg_latency: request_latencies.average(),
390 p95_latency: request_latencies.percentile(95.0),
391 p99_latency: request_latencies.percentile(99.0),
392 requests_per_second: throughput.requests_per_second(),
393
394 total_errors: self.errors.total_errors.load(Ordering::Relaxed),
396 timeout_errors: self.errors.timeout_errors.load(Ordering::Relaxed),
397 connection_errors: self.errors.connection_errors.load(Ordering::Relaxed),
398
399 buffer_pool_stats: self.resources.buffer_pools.read().clone(),
401 parser_cache_stats: self.resources.parser_cache.read().clone(),
402 memory_usage: self.resources.memory_usage.load(Ordering::Relaxed),
403
404 timestamp: Instant::now(),
405 }
406 }
407
408 pub fn print_summary(&self) {
410 let snapshot = self.snapshot();
411
412 info!("=== Kode-Bridge Metrics Summary ===");
413 info!(
414 "Requests: {} total, {} active, {} successful, {} failed",
415 snapshot.total_requests, snapshot.active_requests, snapshot.successful_requests, snapshot.failed_requests
416 );
417
418 if let Some(avg) = snapshot.avg_latency {
419 info!("Latency: avg={:.2}ms", avg.as_millis());
420 }
421 if let Some(p95) = snapshot.p95_latency {
422 info!("Latency P95: {:.2}ms", p95.as_millis());
423 }
424
425 info!("Throughput: {:.2} req/s", snapshot.requests_per_second);
426 info!(
427 "Connections: {} total, {} active, pool hit rate: {:.1}%",
428 snapshot.total_connections,
429 snapshot.active_connections,
430 if snapshot.pool_hits + snapshot.pool_misses > 0 {
431 snapshot.pool_hits as f64 / (snapshot.pool_hits + snapshot.pool_misses) as f64 * 100.0
432 } else {
433 0.0
434 }
435 );
436
437 if snapshot.total_errors > 0 {
438 warn!(
439 "Errors: {} total ({} timeout, {} connection)",
440 snapshot.total_errors, snapshot.timeout_errors, snapshot.connection_errors
441 );
442 }
443 }
444}
445
446pub struct RequestTracker<'a> {
448 metrics: &'a MetricsCollector,
449 start_time: Instant,
450}
451
452impl RequestTracker<'_> {
453 pub fn success(self, status_code: u16) {
455 self.complete(true, Some(status_code));
456 }
457
458 pub fn failure(self, error_type: &str) {
460 self.metrics.record_error(error_type);
461 self.complete(false, None);
462 }
463
464 fn complete(self, success: bool, status_code: Option<u16>) {
465 let latency = self.start_time.elapsed();
466
467 {
469 let mut latencies = self.metrics.performance.request_latencies.write();
470 latencies.record(latency);
471 }
472
473 self.metrics
475 .requests
476 .active_requests
477 .fetch_sub(1, Ordering::Relaxed);
478
479 if success {
480 self.metrics
481 .requests
482 .successful_requests
483 .fetch_add(1, Ordering::Relaxed);
484 } else {
485 self.metrics
486 .requests
487 .failed_requests
488 .fetch_add(1, Ordering::Relaxed);
489 }
490
491 if let Some(status) = status_code {
493 let mut status_map = self.metrics.requests.requests_by_status.write();
494 status_map
495 .entry(status)
496 .or_insert_with(|| AtomicU64::new(0))
497 .fetch_add(1, Ordering::Relaxed);
498 }
499 }
500}
501
502#[derive(Debug, Clone)]
504pub struct MetricsSnapshot {
505 pub total_requests: u64,
507 pub successful_requests: u64,
508 pub failed_requests: u64,
509 pub active_requests: usize,
510
511 pub total_connections: u64,
513 pub active_connections: usize,
514 pub pool_hits: u64,
515 pub pool_misses: u64,
516
517 pub avg_latency: Option<Duration>,
519 pub p95_latency: Option<Duration>,
520 pub p99_latency: Option<Duration>,
521 pub requests_per_second: f64,
522
523 pub total_errors: u64,
525 pub timeout_errors: u64,
526 pub connection_errors: u64,
527
528 pub buffer_pool_stats: BufferPoolStats,
530 pub parser_cache_stats: ParserCacheStats,
531 pub memory_usage: u64,
532
533 pub timestamp: Instant,
534}
535
536pub struct HealthChecker {
538 metrics: Arc<MetricsCollector>,
539 thresholds: HealthThresholds,
540}
541
542#[derive(Debug, Clone)]
543pub struct HealthThresholds {
544 pub max_error_rate: f64, pub max_avg_latency: Duration, pub max_p95_latency: Duration, pub min_success_rate: f64, pub max_active_connections: usize, }
550
551impl Default for HealthThresholds {
552 fn default() -> Self {
553 Self {
554 max_error_rate: 0.05, max_avg_latency: Duration::from_millis(500), max_p95_latency: Duration::from_secs(2), min_success_rate: 0.95, max_active_connections: 1000, }
560 }
561}
562
563#[derive(Debug, Clone, PartialEq)]
564pub enum HealthStatus {
565 Healthy,
566 Warning,
567 Critical,
568}
569
570pub struct HealthReport {
571 pub status: HealthStatus,
572 pub issues: Vec<String>,
573 pub snapshot: MetricsSnapshot,
574}
575
576impl HealthChecker {
577 pub fn new(metrics: Arc<MetricsCollector>) -> Self {
578 Self {
579 metrics,
580 thresholds: HealthThresholds::default(),
581 }
582 }
583
584 pub const fn with_thresholds(mut self, thresholds: HealthThresholds) -> Self {
585 self.thresholds = thresholds;
586 self
587 }
588
589 pub fn check_health(&self) -> HealthReport {
590 let snapshot = self.metrics.snapshot();
591 let mut issues = Vec::new();
592 let mut status = HealthStatus::Healthy;
593
594 if snapshot.total_requests > 0 {
596 let error_rate = snapshot.failed_requests as f64 / snapshot.total_requests as f64;
597 if error_rate > self.thresholds.max_error_rate {
598 issues.push(format!(
599 "High error rate: {:.2}% (threshold: {:.2}%)",
600 error_rate * 100.0,
601 self.thresholds.max_error_rate * 100.0
602 ));
603 status = HealthStatus::Critical;
604 }
605 }
606
607 if let Some(avg_latency) = snapshot.avg_latency {
609 if avg_latency > self.thresholds.max_avg_latency {
610 issues.push(format!(
611 "High average latency: {}ms (threshold: {}ms)",
612 avg_latency.as_millis(),
613 self.thresholds.max_avg_latency.as_millis()
614 ));
615 if status == HealthStatus::Healthy {
616 status = HealthStatus::Warning;
617 }
618 }
619 }
620
621 if let Some(p95_latency) = snapshot.p95_latency {
622 if p95_latency > self.thresholds.max_p95_latency {
623 issues.push(format!(
624 "High P95 latency: {}ms (threshold: {}ms)",
625 p95_latency.as_millis(),
626 self.thresholds.max_p95_latency.as_millis()
627 ));
628 status = HealthStatus::Critical;
629 }
630 }
631
632 if snapshot.active_connections > self.thresholds.max_active_connections {
634 issues.push(format!(
635 "Too many active connections: {} (threshold: {})",
636 snapshot.active_connections, self.thresholds.max_active_connections
637 ));
638 if status == HealthStatus::Healthy {
639 status = HealthStatus::Warning;
640 }
641 }
642
643 HealthReport {
644 status,
645 issues,
646 snapshot,
647 }
648 }
649}
650
651use std::sync::OnceLock;
653
654static GLOBAL_METRICS: OnceLock<Arc<MetricsCollector>> = OnceLock::new();
655
656pub fn global_metrics() -> &'static Arc<MetricsCollector> {
658 GLOBAL_METRICS.get_or_init(|| Arc::new(MetricsCollector::new()))
659}
660
661pub fn init_metrics() -> Arc<MetricsCollector> {
663 Arc::clone(global_metrics())
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use std::thread;
670
671 #[test]
672 fn test_latency_tracker() {
673 let mut tracker = LatencyTracker::new(100);
674
675 tracker.record(Duration::from_millis(100));
676 tracker.record(Duration::from_millis(200));
677 tracker.record(Duration::from_millis(300));
678
679 assert_eq!(tracker.count(), 3);
680 assert_eq!(tracker.average(), Some(Duration::from_millis(200)));
681 assert_eq!(tracker.percentile(50.0), Some(Duration::from_millis(200)));
682 }
683
684 #[test]
685 fn test_throughput_tracker() {
686 let mut tracker = ThroughputTracker::new(Duration::from_secs(1), 10);
687
688 tracker.record_request();
689 tracker.record_request();
690 tracker.record_request();
691
692 let rps = tracker.requests_per_second();
694 assert!(rps >= 0.0);
695 }
696
697 #[test]
698 fn test_metrics_collector() {
699 let metrics = MetricsCollector::new();
700
701 {
702 let tracker = metrics.request_start("GET");
703 thread::sleep(Duration::from_millis(10));
704 tracker.success(200);
705 }
706
707 let snapshot = metrics.snapshot();
708 assert_eq!(snapshot.total_requests, 1);
709 assert_eq!(snapshot.successful_requests, 1);
710 assert_eq!(snapshot.active_requests, 0);
711 }
712
713 #[test]
714 fn test_health_checker() {
715 let metrics = Arc::new(MetricsCollector::new());
716 let checker = HealthChecker::new(metrics);
717
718 let report = checker.check_health();
719 assert_eq!(report.status, HealthStatus::Healthy);
720 assert!(report.issues.is_empty());
721 }
722}