sklears_compose/circuit_breaker/
statistics_tracking.rs

1//! Circuit Breaker Statistics Tracking
2//!
3//! This module provides comprehensive statistics tracking for circuit breakers,
4//! including request counters, response time tracking, error tracking, state
5//! transition monitoring, health metrics, and global statistics aggregation.
6
7use std::collections::{HashMap, VecDeque};
8use std::sync::{
9    atomic::{AtomicU64, Ordering},
10    Arc, Mutex, RwLock,
11};
12use std::time::{Duration, SystemTime};
13use uuid::Uuid;
14
15use crate::fault_core::{CircuitBreakerState, CircuitBreakerStats, FaultSeverity};
16
17use super::error_types::CircuitBreakerError;
18
19/// Circuit breaker statistics tracker
20#[derive(Debug)]
21pub struct CircuitBreakerStatsTracker {
22    /// Request counters
23    request_counters: Arc<Mutex<RequestCounters>>,
24    /// Response time tracker
25    response_times: Arc<Mutex<ResponseTimeTracker>>,
26    /// Error tracker
27    error_tracker: Arc<Mutex<ErrorTracker>>,
28    /// State transition tracker
29    state_transitions: Arc<Mutex<StateTransitionTracker>>,
30    /// Health metrics
31    health_metrics: Arc<Mutex<HealthMetrics>>,
32}
33
34/// Request counters
35#[derive(Debug, Default)]
36pub struct RequestCounters {
37    /// Total requests
38    pub total: AtomicU64,
39    /// Successful requests
40    pub successful: AtomicU64,
41    /// Failed requests
42    pub failed: AtomicU64,
43    /// Rejected requests (circuit open)
44    pub rejected: AtomicU64,
45    /// Timeout requests
46    pub timeout: AtomicU64,
47    /// Requests in progress
48    pub in_progress: AtomicU64,
49    /// Half-open state requests (for monitoring recovery attempts)
50    pub half_open_requests: AtomicU64,
51    /// Half-open state successful requests
52    pub half_open_successes: AtomicU64,
53}
54
55/// Response time tracker
56#[derive(Debug)]
57pub struct ResponseTimeTracker {
58    /// Response time history (sliding window)
59    pub history: VecDeque<Duration>,
60    /// Window size
61    pub window_size: usize,
62    /// Sum of response times in window
63    pub sum: Duration,
64    /// Minimum response time
65    pub min: Option<Duration>,
66    /// Maximum response time
67    pub max: Option<Duration>,
68    /// Percentiles cache
69    pub percentiles: HashMap<u8, Duration>,
70}
71
72/// Error tracker
73#[derive(Debug, Default)]
74pub struct ErrorTracker {
75    /// Error counts by type
76    pub error_counts: HashMap<String, u64>,
77    /// Error patterns
78    pub error_patterns: Vec<ErrorPattern>,
79    /// Recent errors (sliding window)
80    pub recent_errors: VecDeque<ErrorEvent>,
81    /// Error rate calculation
82    pub error_rate: f64,
83}
84
85/// Error pattern
86#[derive(Debug, Clone)]
87pub struct ErrorPattern {
88    /// Pattern identifier
89    pub id: String,
90    /// Pattern name
91    pub name: String,
92    /// Pattern regex
93    pub pattern: String,
94    /// Match count
95    pub count: u64,
96    /// First occurrence
97    pub first_seen: SystemTime,
98    /// Last occurrence
99    pub last_seen: SystemTime,
100    /// Pattern severity
101    pub severity: FaultSeverity,
102}
103
104/// Error event
105#[derive(Debug, Clone)]
106pub struct ErrorEvent {
107    /// Event timestamp
108    pub timestamp: SystemTime,
109    /// Error type
110    pub error_type: String,
111    /// Error message
112    pub message: String,
113    /// Error severity
114    pub severity: FaultSeverity,
115    /// Request context
116    pub context: RequestContext,
117}
118
119/// Request context
120#[derive(Debug, Clone)]
121pub struct RequestContext {
122    /// Request identifier
123    pub request_id: String,
124    /// User identifier
125    pub user_id: Option<String>,
126    /// Session identifier
127    pub session_id: Option<String>,
128    /// Request metadata
129    pub metadata: HashMap<String, String>,
130}
131
132/// State transition tracker
133#[derive(Debug)]
134pub struct StateTransitionTracker {
135    /// Transition history
136    pub transitions: VecDeque<StateTransition>,
137    /// Current state start time
138    pub current_state_start: SystemTime,
139    /// State durations
140    pub state_durations: HashMap<CircuitBreakerState, Duration>,
141    /// Transition counts
142    pub transition_counts: HashMap<(CircuitBreakerState, CircuitBreakerState), u64>,
143}
144
145/// State transition
146#[derive(Debug, Clone)]
147pub struct StateTransition {
148    /// Transition timestamp
149    pub timestamp: SystemTime,
150    /// Previous state
151    pub from_state: CircuitBreakerState,
152    /// New state
153    pub to_state: CircuitBreakerState,
154    /// Transition reason
155    pub reason: TransitionReason,
156    /// Transition metadata
157    pub metadata: HashMap<String, String>,
158}
159
160/// Transition reason enumeration
161#[derive(Debug, Clone)]
162pub enum TransitionReason {
163    /// Failure threshold exceeded
164    FailureThresholdExceeded,
165    /// Recovery timeout elapsed
166    RecoveryTimeoutElapsed,
167    /// Successful recovery test
168    SuccessfulRecoveryTest,
169    /// Failed recovery test
170    FailedRecoveryTest,
171    /// Manual override
172    ManualOverride,
173    /// Health check failure
174    HealthCheckFailure,
175    /// Policy violation
176    PolicyViolation,
177    /// Custom reason
178    Custom(String),
179}
180
181/// Health metrics
182#[derive(Debug, Clone)]
183pub struct HealthMetrics {
184    /// Overall health score (0.0 to 1.0)
185    pub health_score: f64,
186    /// Availability (0.0 to 1.0)
187    pub availability: f64,
188    /// Reliability (0.0 to 1.0)
189    pub reliability: f64,
190    /// Performance score (0.0 to 1.0)
191    pub performance_score: f64,
192    /// Last health check
193    pub last_health_check: SystemTime,
194    /// Health trend
195    pub health_trend: HealthTrend,
196}
197
198/// Health trend enumeration
199#[derive(Debug, Clone, PartialEq)]
200pub enum HealthTrend {
201    /// Improving
202    Improving,
203    /// Stable
204    Stable,
205    /// Degrading
206    Degrading,
207    /// Critical
208    Critical,
209    /// Unknown
210    Unknown,
211}
212
213/// Circuit breaker statistics aggregator for global statistics
214#[derive(Debug)]
215pub struct CircuitBreakerStatsAggregator {
216    /// Aggregated statistics
217    stats: Arc<RwLock<AggregatedStats>>,
218    /// Aggregation configuration
219    config: AggregationConfig,
220}
221
222/// Aggregated statistics structure
223#[derive(Debug, Default)]
224pub struct AggregatedStats {
225    /// Total circuit breakers
226    pub total_breakers: u64,
227    /// Active circuit breakers
228    pub active_breakers: u64,
229    /// Open circuit breakers
230    pub open_breakers: u64,
231    /// Half-open circuit breakers
232    pub half_open_breakers: u64,
233    /// Global request statistics
234    pub global_requests: RequestCounters,
235    /// Global health score
236    pub global_health: f64,
237}
238
239/// Aggregation configuration
240#[derive(Debug, Clone)]
241pub struct AggregationConfig {
242    /// Aggregation interval
243    pub interval: Duration,
244    /// Enable real-time aggregation
245    pub real_time: bool,
246    /// Aggregation algorithms
247    pub algorithms: Vec<String>,
248}
249
250/// Request result enumeration for statistics tracking
251#[derive(Debug, Clone, PartialEq)]
252pub enum RequestResult {
253    /// Success
254    Success,
255    /// Failure
256    Failure,
257    /// Timeout
258    Timeout,
259    /// Rejected
260    Rejected,
261}
262
263impl Default for CircuitBreakerStatsTracker {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl CircuitBreakerStatsTracker {
270    /// Create a new statistics tracker
271    #[must_use]
272    pub fn new() -> Self {
273        Self {
274            request_counters: Arc::new(Mutex::new(RequestCounters::default())),
275            response_times: Arc::new(Mutex::new(ResponseTimeTracker::default())),
276            error_tracker: Arc::new(Mutex::new(ErrorTracker::default())),
277            state_transitions: Arc::new(Mutex::new(StateTransitionTracker::default())),
278            health_metrics: Arc::new(Mutex::new(HealthMetrics::default())),
279        }
280    }
281
282    /// Record a successful request
283    pub fn record_success(&self, duration: Duration) {
284        let counters = self.request_counters.lock().unwrap();
285        counters.successful.fetch_add(1, Ordering::Relaxed);
286        counters.total.fetch_add(1, Ordering::Relaxed);
287
288        let mut response_times = self.response_times.lock().unwrap();
289        response_times.history.push_back(duration);
290        if response_times.history.len() > response_times.window_size {
291            response_times.history.pop_front();
292        }
293
294        // Update min/max response times
295        response_times.min = Some(response_times.min.map_or(duration, |min| min.min(duration)));
296        response_times.max = Some(response_times.max.map_or(duration, |max| max.max(duration)));
297
298        // Update sum for average calculation
299        response_times.sum += duration;
300        if response_times.history.len() > response_times.window_size {
301            let first_duration = response_times.history[0];
302            response_times.sum -= first_duration;
303        }
304    }
305
306    /// Record a failed request
307    pub fn record_failure(&self, error: &CircuitBreakerError) {
308        let counters = self.request_counters.lock().unwrap();
309        counters.failed.fetch_add(1, Ordering::Relaxed);
310        counters.total.fetch_add(1, Ordering::Relaxed);
311
312        let mut error_tracker = self.error_tracker.lock().unwrap();
313        let error_type = format!("{error:?}");
314        *error_tracker
315            .error_counts
316            .entry(error_type.clone())
317            .or_insert(0) += 1;
318
319        error_tracker.recent_errors.push_back(ErrorEvent {
320            timestamp: SystemTime::now(),
321            error_type,
322            message: error.to_string(),
323            severity: FaultSeverity::Medium,
324            context: RequestContext {
325                request_id: Uuid::new_v4().to_string(),
326                user_id: None,
327                session_id: None,
328                metadata: HashMap::new(),
329            },
330        });
331
332        // Maintain sliding window for recent errors
333        if error_tracker.recent_errors.len() > 1000 {
334            error_tracker.recent_errors.pop_front();
335        }
336    }
337
338    /// Record a rejected request (circuit open)
339    pub fn record_rejection(&self) {
340        let counters = self.request_counters.lock().unwrap();
341        counters.rejected.fetch_add(1, Ordering::Relaxed);
342        counters.total.fetch_add(1, Ordering::Relaxed);
343    }
344
345    /// Record a timeout request
346    pub fn record_timeout(&self) {
347        let counters = self.request_counters.lock().unwrap();
348        counters.timeout.fetch_add(1, Ordering::Relaxed);
349        counters.total.fetch_add(1, Ordering::Relaxed);
350    }
351
352    /// Track a half-open request
353    pub fn track_half_open_request(&self) {
354        let counters = self.request_counters.lock().unwrap();
355        counters.half_open_requests.fetch_add(1, Ordering::Relaxed);
356    }
357
358    /// Track a half-open successful request
359    pub fn track_half_open_success(&self) {
360        let counters = self.request_counters.lock().unwrap();
361        counters.half_open_successes.fetch_add(1, Ordering::Relaxed);
362    }
363
364    /// Reset half-open counters
365    pub fn reset_half_open_counters(&self) {
366        let counters = self.request_counters.lock().unwrap();
367        counters.half_open_requests.store(0, Ordering::Relaxed);
368        counters.half_open_successes.store(0, Ordering::Relaxed);
369    }
370
371    /// Get current half-open success count
372    pub fn get_half_open_successes(&self) -> u64 {
373        let counters = self.request_counters.lock().unwrap();
374        counters.half_open_successes.load(Ordering::Relaxed)
375    }
376
377    /// Record state transition
378    pub fn record_state_transition(
379        &self,
380        from_state: CircuitBreakerState,
381        to_state: CircuitBreakerState,
382        reason: TransitionReason,
383    ) {
384        let mut transitions = self.state_transitions.lock().unwrap();
385
386        let transition = StateTransition {
387            timestamp: SystemTime::now(),
388            from_state,
389            to_state,
390            reason,
391            metadata: HashMap::new(),
392        };
393
394        transitions.transitions.push_back(transition);
395
396        // Update transition counts
397        let key = (from_state, to_state);
398        *transitions.transition_counts.entry(key).or_insert(0) += 1;
399
400        // Maintain sliding window for transitions
401        if transitions.transitions.len() > 100 {
402            transitions.transitions.pop_front();
403        }
404
405        // Update current state start time
406        transitions.current_state_start = SystemTime::now();
407    }
408
409    /// Update health metrics
410    pub fn update_health_metrics(&self, health_score: f64, availability: f64, reliability: f64) {
411        let mut health = self.health_metrics.lock().unwrap();
412
413        let previous_score = health.health_score;
414        health.health_score = health_score;
415        health.availability = availability;
416        health.reliability = reliability;
417        health.last_health_check = SystemTime::now();
418
419        // Determine trend
420        health.health_trend = if health_score > previous_score + 0.1 {
421            HealthTrend::Improving
422        } else if health_score < previous_score - 0.1 {
423            HealthTrend::Degrading
424        } else if health_score < 0.3 {
425            HealthTrend::Critical
426        } else {
427            HealthTrend::Stable
428        };
429    }
430
431    /// Reset all statistics
432    pub fn reset(&self) {
433        let counters = self.request_counters.lock().unwrap();
434        counters.total.store(0, Ordering::Relaxed);
435        counters.successful.store(0, Ordering::Relaxed);
436        counters.failed.store(0, Ordering::Relaxed);
437        counters.rejected.store(0, Ordering::Relaxed);
438        counters.timeout.store(0, Ordering::Relaxed);
439        counters.in_progress.store(0, Ordering::Relaxed);
440        counters.half_open_requests.store(0, Ordering::Relaxed);
441        counters.half_open_successes.store(0, Ordering::Relaxed);
442
443        let mut response_times = self.response_times.lock().unwrap();
444        response_times.history.clear();
445        response_times.sum = Duration::default();
446        response_times.min = None;
447        response_times.max = None;
448        response_times.percentiles.clear();
449
450        let mut error_tracker = self.error_tracker.lock().unwrap();
451        error_tracker.error_counts.clear();
452        error_tracker.recent_errors.clear();
453        error_tracker.error_rate = 0.0;
454
455        let mut transitions = self.state_transitions.lock().unwrap();
456        transitions.transitions.clear();
457        transitions.transition_counts.clear();
458
459        let mut health = self.health_metrics.lock().unwrap();
460        *health = HealthMetrics::default();
461    }
462
463    /// Get current statistics
464    #[must_use]
465    pub fn get_stats(&self) -> CircuitBreakerStats {
466        let counters = self.request_counters.lock().unwrap();
467        /// CircuitBreakerStats
468        CircuitBreakerStats {
469            total_requests: counters.total.load(Ordering::Relaxed),
470            successful_requests: counters.successful.load(Ordering::Relaxed),
471            failed_requests: counters.failed.load(Ordering::Relaxed),
472            consecutive_failures: 0, // Would need to track this separately
473            state_changes: 0,        // Would need to track this separately
474            last_failure_time: None,
475            last_success_time: None,
476            half_open_requests: counters.half_open_requests.load(Ordering::Relaxed),
477            half_open_successes: counters.half_open_successes.load(Ordering::Relaxed),
478        }
479    }
480
481    /// Get request counters
482    #[must_use]
483    pub fn get_request_counters(&self) -> RequestCounters {
484        let counters = self.request_counters.lock().unwrap();
485        /// RequestCounters
486        RequestCounters {
487            total: AtomicU64::new(counters.total.load(Ordering::Relaxed)),
488            successful: AtomicU64::new(counters.successful.load(Ordering::Relaxed)),
489            failed: AtomicU64::new(counters.failed.load(Ordering::Relaxed)),
490            rejected: AtomicU64::new(counters.rejected.load(Ordering::Relaxed)),
491            timeout: AtomicU64::new(counters.timeout.load(Ordering::Relaxed)),
492            in_progress: AtomicU64::new(counters.in_progress.load(Ordering::Relaxed)),
493            half_open_requests: AtomicU64::new(counters.half_open_requests.load(Ordering::Relaxed)),
494            half_open_successes: AtomicU64::new(
495                counters.half_open_successes.load(Ordering::Relaxed),
496            ),
497        }
498    }
499
500    /// Get response time statistics
501    #[must_use]
502    pub fn get_response_time_stats(&self) -> ResponseTimeStats {
503        let response_times = self.response_times.lock().unwrap();
504
505        let avg = if response_times.history.is_empty() {
506            Duration::default()
507        } else {
508            response_times.sum / response_times.history.len() as u32
509        };
510
511        /// ResponseTimeStats
512        ResponseTimeStats {
513            average: avg,
514            min: response_times.min,
515            max: response_times.max,
516            percentiles: response_times.percentiles.clone(),
517            sample_count: response_times.history.len(),
518        }
519    }
520
521    /// Get error statistics
522    #[must_use]
523    pub fn get_error_stats(&self) -> ErrorStats {
524        let error_tracker = self.error_tracker.lock().unwrap();
525
526        /// ErrorStats
527        ErrorStats {
528            error_counts: error_tracker.error_counts.clone(),
529            error_rate: error_tracker.error_rate,
530            recent_error_count: error_tracker.recent_errors.len(),
531            patterns: error_tracker.error_patterns.clone(),
532        }
533    }
534
535    /// Get health metrics
536    #[must_use]
537    pub fn get_health_metrics(&self) -> HealthMetrics {
538        let health = self.health_metrics.lock().unwrap();
539        (*health).clone()
540    }
541}
542
543impl CircuitBreakerStatsAggregator {
544    /// Create a new statistics aggregator
545    #[must_use]
546    pub fn new(config: AggregationConfig) -> Self {
547        Self {
548            stats: Arc::new(RwLock::new(AggregatedStats::default())),
549            config,
550        }
551    }
552
553    /// Update aggregated statistics
554    pub fn update_stats(&self, breaker_stats: &[CircuitBreakerStats]) {
555        let mut stats = self.stats.write().unwrap();
556
557        stats.total_breakers = breaker_stats.len() as u64;
558        stats.active_breakers = breaker_stats.len() as u64; // Simplified
559
560        // Aggregate request counts
561        let mut total_requests = 0;
562        let mut successful_requests = 0;
563        let mut failed_requests = 0;
564
565        for stat in breaker_stats {
566            total_requests += stat.total_requests;
567            successful_requests += stat.successful_requests;
568            failed_requests += stat.failed_requests;
569        }
570
571        stats
572            .global_requests
573            .total
574            .store(total_requests, Ordering::Relaxed);
575        stats
576            .global_requests
577            .successful
578            .store(successful_requests, Ordering::Relaxed);
579        stats
580            .global_requests
581            .failed
582            .store(failed_requests, Ordering::Relaxed);
583
584        // Calculate global health score
585        stats.global_health = if total_requests > 0 {
586            successful_requests as f64 / total_requests as f64
587        } else {
588            1.0
589        };
590    }
591
592    /// Get aggregated statistics
593    #[must_use]
594    pub fn get_stats(&self) -> AggregatedStats {
595        let stats = self.stats.read().unwrap();
596        /// AggregatedStats
597        AggregatedStats {
598            total_breakers: stats.total_breakers,
599            active_breakers: stats.active_breakers,
600            open_breakers: stats.open_breakers,
601            half_open_breakers: stats.half_open_breakers,
602            global_requests: RequestCounters {
603                total: AtomicU64::new(stats.global_requests.total.load(Ordering::Relaxed)),
604                successful: AtomicU64::new(
605                    stats.global_requests.successful.load(Ordering::Relaxed),
606                ),
607                failed: AtomicU64::new(stats.global_requests.failed.load(Ordering::Relaxed)),
608                rejected: AtomicU64::new(stats.global_requests.rejected.load(Ordering::Relaxed)),
609                timeout: AtomicU64::new(stats.global_requests.timeout.load(Ordering::Relaxed)),
610                in_progress: AtomicU64::new(
611                    stats.global_requests.in_progress.load(Ordering::Relaxed),
612                ),
613                half_open_requests: AtomicU64::new(
614                    stats
615                        .global_requests
616                        .half_open_requests
617                        .load(Ordering::Relaxed),
618                ),
619                half_open_successes: AtomicU64::new(
620                    stats
621                        .global_requests
622                        .half_open_successes
623                        .load(Ordering::Relaxed),
624                ),
625            },
626            global_health: stats.global_health,
627        }
628    }
629
630    /// Reset aggregated statistics
631    pub fn reset(&self) {
632        let mut stats = self.stats.write().unwrap();
633        *stats = AggregatedStats::default();
634    }
635}
636
637/// Response time statistics summary
638#[derive(Debug, Clone)]
639pub struct ResponseTimeStats {
640    /// Average response time
641    pub average: Duration,
642    /// Minimum response time
643    pub min: Option<Duration>,
644    /// Maximum response time
645    pub max: Option<Duration>,
646    /// Percentile values
647    pub percentiles: HashMap<u8, Duration>,
648    /// Number of samples
649    pub sample_count: usize,
650}
651
652/// Error statistics summary
653#[derive(Debug, Clone)]
654pub struct ErrorStats {
655    /// Error counts by type
656    pub error_counts: HashMap<String, u64>,
657    /// Current error rate
658    pub error_rate: f64,
659    /// Count of recent errors
660    pub recent_error_count: usize,
661    /// Detected error patterns
662    pub patterns: Vec<ErrorPattern>,
663}
664
665impl Default for ResponseTimeTracker {
666    fn default() -> Self {
667        Self {
668            history: VecDeque::new(),
669            window_size: 1000, // Default window size
670            sum: Duration::default(),
671            min: None,
672            max: None,
673            percentiles: HashMap::new(),
674        }
675    }
676}
677
678impl Default for AggregationConfig {
679    fn default() -> Self {
680        Self {
681            interval: Duration::from_secs(60),
682            real_time: true,
683            algorithms: vec!["simple".to_string()],
684        }
685    }
686}
687
688impl Default for HealthMetrics {
689    fn default() -> Self {
690        Self {
691            health_score: 1.0,
692            availability: 1.0,
693            reliability: 1.0,
694            performance_score: 1.0,
695            last_health_check: SystemTime::now(),
696            health_trend: HealthTrend::Unknown,
697        }
698    }
699}
700
701impl Default for StateTransitionTracker {
702    fn default() -> Self {
703        Self {
704            transitions: VecDeque::new(),
705            current_state_start: SystemTime::now(),
706            state_durations: HashMap::new(),
707            transition_counts: HashMap::new(),
708        }
709    }
710}