Skip to main content

kode_bridge/
metrics.rs

1use parking_lot::RwLock;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tracing::{info, warn};
7
8/// Global metrics collection system
9pub struct MetricsCollector {
10    /// Request counters
11    pub requests: RequestMetrics,
12    /// Connection metrics  
13    pub connections: ConnectionMetrics,
14    /// Performance metrics
15    pub performance: PerformanceMetrics,
16    /// Error tracking
17    pub errors: ErrorMetrics,
18    /// Resource usage
19    pub resources: ResourceMetrics,
20}
21
22/// Request-related metrics
23#[derive(Default)]
24pub struct RequestMetrics {
25    /// Total requests processed
26    pub total_requests: AtomicU64,
27    /// Successful requests
28    pub successful_requests: AtomicU64,
29    /// Failed requests
30    pub failed_requests: AtomicU64,
31    /// Requests by method
32    pub requests_by_method: RwLock<HashMap<String, AtomicU64>>,
33    /// Requests by status code
34    pub requests_by_status: RwLock<HashMap<u16, AtomicU64>>,
35    /// Active requests (in-flight)
36    pub active_requests: AtomicUsize,
37}
38
39/// Connection-related metrics
40#[derive(Default)]
41pub struct ConnectionMetrics {
42    /// Total connections created
43    pub total_connections: AtomicU64,
44    /// Active connections
45    pub active_connections: AtomicUsize,
46    /// Failed connection attempts
47    pub failed_connections: AtomicU64,
48    /// Connection pool utilization
49    pub pool_size: AtomicUsize,
50    /// Pool hits (reused connections)
51    pub pool_hits: AtomicU64,
52    /// Pool misses (new connections)
53    pub pool_misses: AtomicU64,
54}
55
56/// Performance-related metrics
57pub struct PerformanceMetrics {
58    /// Request latency tracking
59    pub request_latencies: RwLock<LatencyTracker>,
60    /// Connection establishment times
61    pub connection_latencies: RwLock<LatencyTracker>,
62    /// Throughput tracking
63    pub throughput: RwLock<ThroughputTracker>,
64    /// Retry statistics
65    pub retry_stats: RwLock<RetryStats>,
66}
67
68/// Error tracking metrics
69#[derive(Default)]
70pub struct ErrorMetrics {
71    /// Errors by type
72    pub errors_by_type: RwLock<HashMap<String, AtomicU64>>,
73    /// Total errors
74    pub total_errors: AtomicU64,
75    /// Circuit breaker trips
76    pub circuit_breaker_trips: AtomicU64,
77    /// Timeout errors
78    pub timeout_errors: AtomicU64,
79    /// Connection errors
80    pub connection_errors: AtomicU64,
81}
82
83/// Resource usage metrics
84#[derive(Default)]
85pub struct ResourceMetrics {
86    /// Buffer pool statistics
87    pub buffer_pools: RwLock<BufferPoolStats>,
88    /// Parser cache statistics
89    pub parser_cache: RwLock<ParserCacheStats>,
90    /// Memory usage estimates
91    pub memory_usage: AtomicU64,
92    /// CPU time tracking
93    pub cpu_time: RwLock<CpuTimeTracker>,
94}
95
96/// Latency tracking with percentiles
97pub struct LatencyTracker {
98    samples: Vec<Duration>,
99    last_reset: Instant,
100    max_samples: usize,
101}
102
103/// Throughput tracking
104pub struct ThroughputTracker {
105    /// Requests per time window
106    windows: Vec<(Instant, u64)>,
107    window_size: Duration,
108    max_windows: usize,
109}
110
111/// Retry statistics
112#[derive(Default)]
113pub struct RetryStats {
114    /// Total retry attempts
115    pub total_retries: AtomicU64,
116    /// Successful retries (eventual success)
117    pub successful_retries: AtomicU64,
118    /// Failed retries (gave up)
119    pub failed_retries: AtomicU64,
120    /// Retry counts by attempt number
121    pub retries_by_attempt: HashMap<usize, AtomicU64>,
122}
123
124/// Buffer pool statistics
125#[derive(Default, Clone, Debug)]
126pub struct BufferPoolStats {
127    pub small_pool_size: usize,
128    pub medium_pool_size: usize,
129    pub large_pool_size: usize,
130    pub total_allocations: u64,
131    pub total_reuses: u64,
132}
133
134/// Parser cache statistics
135#[derive(Default, Clone, Debug)]
136pub struct ParserCacheStats {
137    pub cache_size: usize,
138    pub cache_hits: u64,
139    pub cache_misses: u64,
140    pub hit_rate: f64,
141}
142
143/// CPU time tracking
144#[derive(Default)]
145pub struct CpuTimeTracker {
146    // Placeholder for future CPU monitoring
147}
148
149impl Default for PerformanceMetrics {
150    fn default() -> Self {
151        Self {
152            request_latencies: RwLock::new(LatencyTracker::new(1000)),
153            connection_latencies: RwLock::new(LatencyTracker::new(500)),
154            throughput: RwLock::new(ThroughputTracker::new(Duration::from_secs(60), 100)),
155            retry_stats: RwLock::new(RetryStats::default()),
156        }
157    }
158}
159
160impl LatencyTracker {
161    pub fn new(max_samples: usize) -> Self {
162        Self {
163            samples: Vec::with_capacity(max_samples),
164            last_reset: Instant::now(),
165            max_samples,
166        }
167    }
168
169    pub fn record(&mut self, latency: Duration) {
170        if self.samples.len() >= self.max_samples {
171            // Keep only recent samples
172            self.samples.drain(0..self.max_samples / 2);
173        }
174        self.samples.push(latency);
175    }
176
177    pub fn percentile(&self, p: f64) -> Option<Duration> {
178        if self.samples.is_empty() {
179            return None;
180        }
181
182        let mut sorted = self.samples.clone();
183        sorted.sort();
184
185        let index = ((sorted.len() - 1) as f64 * p / 100.0).round() as usize;
186        Some(sorted[index])
187    }
188
189    pub fn average(&self) -> Option<Duration> {
190        if self.samples.is_empty() {
191            return None;
192        }
193
194        let sum: Duration = self.samples.iter().sum();
195        Some(sum / self.samples.len() as u32)
196    }
197
198    pub fn count(&self) -> usize {
199        self.samples.len()
200    }
201
202    pub fn reset(&mut self) {
203        self.samples.clear();
204        self.last_reset = Instant::now();
205    }
206}
207
208impl ThroughputTracker {
209    pub fn new(window_size: Duration, max_windows: usize) -> Self {
210        Self {
211            windows: Vec::with_capacity(max_windows),
212            window_size,
213            max_windows,
214        }
215    }
216
217    pub fn record_request(&mut self) {
218        let now = Instant::now();
219
220        // Clean old windows
221        self.windows
222            .retain(|(timestamp, _)| now.duration_since(*timestamp) <= self.window_size);
223
224        // Add to current window or create new one
225        if let Some((_, count)) = self.windows.last_mut() {
226            *count += 1;
227        } else {
228            if self.windows.len() >= self.max_windows {
229                self.windows.remove(0);
230            }
231            self.windows.push((now, 1));
232        }
233    }
234
235    pub fn requests_per_second(&self) -> f64 {
236        if self.windows.is_empty() {
237            return 0.0;
238        }
239
240        let total_requests: u64 = self.windows.iter().map(|(_, count)| *count).sum();
241        let last_timestamp = match self.windows.last() {
242            Some((timestamp, _)) => *timestamp,
243            None => return 0.0,
244        };
245
246        let time_span = last_timestamp.duration_since(self.windows[0].0);
247
248        if time_span.as_secs_f64() > 0.0 {
249            total_requests as f64 / time_span.as_secs_f64()
250        } else {
251            0.0
252        }
253    }
254}
255
256impl Default for MetricsCollector {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262impl MetricsCollector {
263    pub fn new() -> Self {
264        Self {
265            requests: RequestMetrics::default(),
266            connections: ConnectionMetrics::default(),
267            performance: PerformanceMetrics::default(),
268            errors: ErrorMetrics::default(),
269            resources: ResourceMetrics::default(),
270        }
271    }
272
273    /// Record a request start
274    pub fn request_start(&self, method: &str) -> RequestTracker<'_> {
275        self.requests.total_requests.fetch_add(1, Ordering::Relaxed);
276        self.requests
277            .active_requests
278            .fetch_add(1, Ordering::Relaxed);
279
280        // Update method counter
281        {
282            let methods = self.requests.requests_by_method.read();
283            if let Some(counter) = methods.get(method) {
284                counter.fetch_add(1, Ordering::Relaxed);
285            } else {
286                drop(methods);
287                let mut methods = self.requests.requests_by_method.write();
288                methods
289                    .entry(method.to_string())
290                    .or_insert_with(|| AtomicU64::new(1));
291            }
292        }
293
294        // Update throughput
295        {
296            let mut throughput = self.performance.throughput.write();
297            throughput.record_request();
298        }
299
300        RequestTracker {
301            metrics: self,
302            start_time: Instant::now(),
303        }
304    }
305
306    /// Record a connection event
307    pub fn connection_created(&self, from_pool: bool) {
308        self.connections
309            .total_connections
310            .fetch_add(1, Ordering::Relaxed);
311        self.connections
312            .active_connections
313            .fetch_add(1, Ordering::Relaxed);
314
315        if from_pool {
316            self.connections.pool_hits.fetch_add(1, Ordering::Relaxed);
317        } else {
318            self.connections.pool_misses.fetch_add(1, Ordering::Relaxed);
319        }
320    }
321
322    /// Record connection failure
323    pub fn connection_failed(&self) {
324        self.connections
325            .failed_connections
326            .fetch_add(1, Ordering::Relaxed);
327        self.errors
328            .connection_errors
329            .fetch_add(1, Ordering::Relaxed);
330    }
331
332    /// Record an error
333    pub fn record_error(&self, error_type: &str) {
334        self.errors.total_errors.fetch_add(1, Ordering::Relaxed);
335
336        let mut errors = self.errors.errors_by_type.write();
337        errors
338            .entry(error_type.to_string())
339            .or_insert_with(|| AtomicU64::new(0))
340            .fetch_add(1, Ordering::Relaxed);
341    }
342
343    /// Record retry attempt
344    pub fn record_retry(&self, attempt: usize, success: bool) {
345        let mut stats = self.performance.retry_stats.write();
346        stats.total_retries.fetch_add(1, Ordering::Relaxed);
347
348        if success {
349            stats.successful_retries.fetch_add(1, Ordering::Relaxed);
350        } else {
351            stats.failed_retries.fetch_add(1, Ordering::Relaxed);
352        }
353
354        stats
355            .retries_by_attempt
356            .entry(attempt)
357            .or_insert_with(|| AtomicU64::new(0))
358            .fetch_add(1, Ordering::Relaxed);
359    }
360
361    /// Update resource usage
362    pub fn update_buffer_pool_stats(&self, stats: BufferPoolStats) {
363        *self.resources.buffer_pools.write() = stats;
364    }
365
366    pub fn update_parser_cache_stats(&self, stats: ParserCacheStats) {
367        *self.resources.parser_cache.write() = stats;
368    }
369
370    /// Get comprehensive metrics snapshot
371    pub fn snapshot(&self) -> MetricsSnapshot {
372        let request_latencies = self.performance.request_latencies.read();
373        let throughput = self.performance.throughput.read();
374
375        MetricsSnapshot {
376            // Request metrics
377            total_requests: self.requests.total_requests.load(Ordering::Relaxed),
378            successful_requests: self.requests.successful_requests.load(Ordering::Relaxed),
379            failed_requests: self.requests.failed_requests.load(Ordering::Relaxed),
380            active_requests: self.requests.active_requests.load(Ordering::Relaxed),
381
382            // Connection metrics
383            total_connections: self.connections.total_connections.load(Ordering::Relaxed),
384            active_connections: self.connections.active_connections.load(Ordering::Relaxed),
385            pool_hits: self.connections.pool_hits.load(Ordering::Relaxed),
386            pool_misses: self.connections.pool_misses.load(Ordering::Relaxed),
387
388            // Performance metrics
389            avg_latency: request_latencies.average(),
390            p95_latency: request_latencies.percentile(95.0),
391            p99_latency: request_latencies.percentile(99.0),
392            requests_per_second: throughput.requests_per_second(),
393
394            // Error metrics
395            total_errors: self.errors.total_errors.load(Ordering::Relaxed),
396            timeout_errors: self.errors.timeout_errors.load(Ordering::Relaxed),
397            connection_errors: self.errors.connection_errors.load(Ordering::Relaxed),
398
399            // Resource metrics
400            buffer_pool_stats: self.resources.buffer_pools.read().clone(),
401            parser_cache_stats: self.resources.parser_cache.read().clone(),
402            memory_usage: self.resources.memory_usage.load(Ordering::Relaxed),
403
404            timestamp: Instant::now(),
405        }
406    }
407
408    /// Print metrics summary
409    pub fn print_summary(&self) {
410        let snapshot = self.snapshot();
411
412        info!("=== Kode-Bridge Metrics Summary ===");
413        info!(
414            "Requests: {} total, {} active, {} successful, {} failed",
415            snapshot.total_requests, snapshot.active_requests, snapshot.successful_requests, snapshot.failed_requests
416        );
417
418        if let Some(avg) = snapshot.avg_latency {
419            info!("Latency: avg={:.2}ms", avg.as_millis());
420        }
421        if let Some(p95) = snapshot.p95_latency {
422            info!("Latency P95: {:.2}ms", p95.as_millis());
423        }
424
425        info!("Throughput: {:.2} req/s", snapshot.requests_per_second);
426        info!(
427            "Connections: {} total, {} active, pool hit rate: {:.1}%",
428            snapshot.total_connections,
429            snapshot.active_connections,
430            if snapshot.pool_hits + snapshot.pool_misses > 0 {
431                snapshot.pool_hits as f64 / (snapshot.pool_hits + snapshot.pool_misses) as f64 * 100.0
432            } else {
433                0.0
434            }
435        );
436
437        if snapshot.total_errors > 0 {
438            warn!(
439                "Errors: {} total ({} timeout, {} connection)",
440                snapshot.total_errors, snapshot.timeout_errors, snapshot.connection_errors
441            );
442        }
443    }
444}
445
446/// RAII tracker for individual requests
447pub struct RequestTracker<'a> {
448    metrics: &'a MetricsCollector,
449    start_time: Instant,
450}
451
452impl RequestTracker<'_> {
453    /// Mark request as completed successfully
454    pub fn success(self, status_code: u16) {
455        self.complete(true, Some(status_code));
456    }
457
458    /// Mark request as failed
459    pub fn failure(self, error_type: &str) {
460        self.metrics.record_error(error_type);
461        self.complete(false, None);
462    }
463
464    fn complete(self, success: bool, status_code: Option<u16>) {
465        let latency = self.start_time.elapsed();
466
467        // Record latency
468        {
469            let mut latencies = self.metrics.performance.request_latencies.write();
470            latencies.record(latency);
471        }
472
473        // Update counters
474        self.metrics
475            .requests
476            .active_requests
477            .fetch_sub(1, Ordering::Relaxed);
478
479        if success {
480            self.metrics
481                .requests
482                .successful_requests
483                .fetch_add(1, Ordering::Relaxed);
484        } else {
485            self.metrics
486                .requests
487                .failed_requests
488                .fetch_add(1, Ordering::Relaxed);
489        }
490
491        // Record status code
492        if let Some(status) = status_code {
493            let mut status_map = self.metrics.requests.requests_by_status.write();
494            status_map
495                .entry(status)
496                .or_insert_with(|| AtomicU64::new(0))
497                .fetch_add(1, Ordering::Relaxed);
498        }
499    }
500}
501
502/// Snapshot of current metrics
503#[derive(Debug, Clone)]
504pub struct MetricsSnapshot {
505    // Request metrics
506    pub total_requests: u64,
507    pub successful_requests: u64,
508    pub failed_requests: u64,
509    pub active_requests: usize,
510
511    // Connection metrics
512    pub total_connections: u64,
513    pub active_connections: usize,
514    pub pool_hits: u64,
515    pub pool_misses: u64,
516
517    // Performance metrics
518    pub avg_latency: Option<Duration>,
519    pub p95_latency: Option<Duration>,
520    pub p99_latency: Option<Duration>,
521    pub requests_per_second: f64,
522
523    // Error metrics
524    pub total_errors: u64,
525    pub timeout_errors: u64,
526    pub connection_errors: u64,
527
528    // Resource metrics
529    pub buffer_pool_stats: BufferPoolStats,
530    pub parser_cache_stats: ParserCacheStats,
531    pub memory_usage: u64,
532
533    pub timestamp: Instant,
534}
535
536/// Health check system
537pub struct HealthChecker {
538    metrics: Arc<MetricsCollector>,
539    thresholds: HealthThresholds,
540}
541
542#[derive(Debug, Clone)]
543pub struct HealthThresholds {
544    pub max_error_rate: f64,           // Maximum error rate (0.0-1.0)
545    pub max_avg_latency: Duration,     // Maximum average latency
546    pub max_p95_latency: Duration,     // Maximum P95 latency
547    pub min_success_rate: f64,         // Minimum success rate (0.0-1.0)
548    pub max_active_connections: usize, // Maximum active connections
549}
550
551impl Default for HealthThresholds {
552    fn default() -> Self {
553        Self {
554            max_error_rate: 0.05,                        // 5% error rate
555            max_avg_latency: Duration::from_millis(500), // 500ms avg
556            max_p95_latency: Duration::from_secs(2),     // 2s P95
557            min_success_rate: 0.95,                      // 95% success rate
558            max_active_connections: 1000,                // 1000 active connections
559        }
560    }
561}
562
563#[derive(Debug, Clone, PartialEq)]
564pub enum HealthStatus {
565    Healthy,
566    Warning,
567    Critical,
568}
569
570pub struct HealthReport {
571    pub status: HealthStatus,
572    pub issues: Vec<String>,
573    pub snapshot: MetricsSnapshot,
574}
575
576impl HealthChecker {
577    pub fn new(metrics: Arc<MetricsCollector>) -> Self {
578        Self {
579            metrics,
580            thresholds: HealthThresholds::default(),
581        }
582    }
583
584    pub const fn with_thresholds(mut self, thresholds: HealthThresholds) -> Self {
585        self.thresholds = thresholds;
586        self
587    }
588
589    pub fn check_health(&self) -> HealthReport {
590        let snapshot = self.metrics.snapshot();
591        let mut issues = Vec::new();
592        let mut status = HealthStatus::Healthy;
593
594        // Check error rate
595        if snapshot.total_requests > 0 {
596            let error_rate = snapshot.failed_requests as f64 / snapshot.total_requests as f64;
597            if error_rate > self.thresholds.max_error_rate {
598                issues.push(format!(
599                    "High error rate: {:.2}% (threshold: {:.2}%)",
600                    error_rate * 100.0,
601                    self.thresholds.max_error_rate * 100.0
602                ));
603                status = HealthStatus::Critical;
604            }
605        }
606
607        // Check latency
608        if let Some(avg_latency) = snapshot.avg_latency {
609            if avg_latency > self.thresholds.max_avg_latency {
610                issues.push(format!(
611                    "High average latency: {}ms (threshold: {}ms)",
612                    avg_latency.as_millis(),
613                    self.thresholds.max_avg_latency.as_millis()
614                ));
615                if status == HealthStatus::Healthy {
616                    status = HealthStatus::Warning;
617                }
618            }
619        }
620
621        if let Some(p95_latency) = snapshot.p95_latency {
622            if p95_latency > self.thresholds.max_p95_latency {
623                issues.push(format!(
624                    "High P95 latency: {}ms (threshold: {}ms)",
625                    p95_latency.as_millis(),
626                    self.thresholds.max_p95_latency.as_millis()
627                ));
628                status = HealthStatus::Critical;
629            }
630        }
631
632        // Check active connections
633        if snapshot.active_connections > self.thresholds.max_active_connections {
634            issues.push(format!(
635                "Too many active connections: {} (threshold: {})",
636                snapshot.active_connections, self.thresholds.max_active_connections
637            ));
638            if status == HealthStatus::Healthy {
639                status = HealthStatus::Warning;
640            }
641        }
642
643        HealthReport {
644            status,
645            issues,
646            snapshot,
647        }
648    }
649}
650
651// Global metrics instance
652use std::sync::OnceLock;
653
654static GLOBAL_METRICS: OnceLock<Arc<MetricsCollector>> = OnceLock::new();
655
656/// Get global metrics collector
657pub fn global_metrics() -> &'static Arc<MetricsCollector> {
658    GLOBAL_METRICS.get_or_init(|| Arc::new(MetricsCollector::new()))
659}
660
661/// Initialize metrics system
662pub fn init_metrics() -> Arc<MetricsCollector> {
663    Arc::clone(global_metrics())
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669    use std::thread;
670
671    #[test]
672    fn test_latency_tracker() {
673        let mut tracker = LatencyTracker::new(100);
674
675        tracker.record(Duration::from_millis(100));
676        tracker.record(Duration::from_millis(200));
677        tracker.record(Duration::from_millis(300));
678
679        assert_eq!(tracker.count(), 3);
680        assert_eq!(tracker.average(), Some(Duration::from_millis(200)));
681        assert_eq!(tracker.percentile(50.0), Some(Duration::from_millis(200)));
682    }
683
684    #[test]
685    fn test_throughput_tracker() {
686        let mut tracker = ThroughputTracker::new(Duration::from_secs(1), 10);
687
688        tracker.record_request();
689        tracker.record_request();
690        tracker.record_request();
691
692        // Note: actual RPS calculation depends on timing
693        let rps = tracker.requests_per_second();
694        assert!(rps >= 0.0);
695    }
696
697    #[test]
698    fn test_metrics_collector() {
699        let metrics = MetricsCollector::new();
700
701        {
702            let tracker = metrics.request_start("GET");
703            thread::sleep(Duration::from_millis(10));
704            tracker.success(200);
705        }
706
707        let snapshot = metrics.snapshot();
708        assert_eq!(snapshot.total_requests, 1);
709        assert_eq!(snapshot.successful_requests, 1);
710        assert_eq!(snapshot.active_requests, 0);
711    }
712
713    #[test]
714    fn test_health_checker() {
715        let metrics = Arc::new(MetricsCollector::new());
716        let checker = HealthChecker::new(metrics);
717
718        let report = checker.check_health();
719        assert_eq!(report.status, HealthStatus::Healthy);
720        assert!(report.issues.is_empty());
721    }
722}