1use std::collections::{HashMap, VecDeque};
8use std::sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc, Mutex, RwLock,
11};
12use std::time::{Duration, SystemTime};
13use uuid::Uuid;
14
15use crate::fault_core::{CircuitBreakerState, CircuitBreakerStats, FaultSeverity};
16
17use super::error_types::CircuitBreakerError;
18
19#[derive(Debug)]
21pub struct CircuitBreakerStatsTracker {
22 request_counters: Arc<Mutex<RequestCounters>>,
24 response_times: Arc<Mutex<ResponseTimeTracker>>,
26 error_tracker: Arc<Mutex<ErrorTracker>>,
28 state_transitions: Arc<Mutex<StateTransitionTracker>>,
30 health_metrics: Arc<Mutex<HealthMetrics>>,
32}
33
34#[derive(Debug, Default)]
36pub struct RequestCounters {
37 pub total: AtomicU64,
39 pub successful: AtomicU64,
41 pub failed: AtomicU64,
43 pub rejected: AtomicU64,
45 pub timeout: AtomicU64,
47 pub in_progress: AtomicU64,
49 pub half_open_requests: AtomicU64,
51 pub half_open_successes: AtomicU64,
53}
54
55#[derive(Debug)]
57pub struct ResponseTimeTracker {
58 pub history: VecDeque<Duration>,
60 pub window_size: usize,
62 pub sum: Duration,
64 pub min: Option<Duration>,
66 pub max: Option<Duration>,
68 pub percentiles: HashMap<u8, Duration>,
70}
71
72#[derive(Debug, Default)]
74pub struct ErrorTracker {
75 pub error_counts: HashMap<String, u64>,
77 pub error_patterns: Vec<ErrorPattern>,
79 pub recent_errors: VecDeque<ErrorEvent>,
81 pub error_rate: f64,
83}
84
85#[derive(Debug, Clone)]
87pub struct ErrorPattern {
88 pub id: String,
90 pub name: String,
92 pub pattern: String,
94 pub count: u64,
96 pub first_seen: SystemTime,
98 pub last_seen: SystemTime,
100 pub severity: FaultSeverity,
102}
103
104#[derive(Debug, Clone)]
106pub struct ErrorEvent {
107 pub timestamp: SystemTime,
109 pub error_type: String,
111 pub message: String,
113 pub severity: FaultSeverity,
115 pub context: RequestContext,
117}
118
119#[derive(Debug, Clone)]
121pub struct RequestContext {
122 pub request_id: String,
124 pub user_id: Option<String>,
126 pub session_id: Option<String>,
128 pub metadata: HashMap<String, String>,
130}
131
132#[derive(Debug)]
134pub struct StateTransitionTracker {
135 pub transitions: VecDeque<StateTransition>,
137 pub current_state_start: SystemTime,
139 pub state_durations: HashMap<CircuitBreakerState, Duration>,
141 pub transition_counts: HashMap<(CircuitBreakerState, CircuitBreakerState), u64>,
143}
144
145#[derive(Debug, Clone)]
147pub struct StateTransition {
148 pub timestamp: SystemTime,
150 pub from_state: CircuitBreakerState,
152 pub to_state: CircuitBreakerState,
154 pub reason: TransitionReason,
156 pub metadata: HashMap<String, String>,
158}
159
160#[derive(Debug, Clone)]
162pub enum TransitionReason {
163 FailureThresholdExceeded,
165 RecoveryTimeoutElapsed,
167 SuccessfulRecoveryTest,
169 FailedRecoveryTest,
171 ManualOverride,
173 HealthCheckFailure,
175 PolicyViolation,
177 Custom(String),
179}
180
181#[derive(Debug, Clone)]
183pub struct HealthMetrics {
184 pub health_score: f64,
186 pub availability: f64,
188 pub reliability: f64,
190 pub performance_score: f64,
192 pub last_health_check: SystemTime,
194 pub health_trend: HealthTrend,
196}
197
198#[derive(Debug, Clone, PartialEq)]
200pub enum HealthTrend {
201 Improving,
203 Stable,
205 Degrading,
207 Critical,
209 Unknown,
211}
212
213#[derive(Debug)]
215pub struct CircuitBreakerStatsAggregator {
216 stats: Arc<RwLock<AggregatedStats>>,
218 config: AggregationConfig,
220}
221
222#[derive(Debug, Default)]
224pub struct AggregatedStats {
225 pub total_breakers: u64,
227 pub active_breakers: u64,
229 pub open_breakers: u64,
231 pub half_open_breakers: u64,
233 pub global_requests: RequestCounters,
235 pub global_health: f64,
237}
238
239#[derive(Debug, Clone)]
241pub struct AggregationConfig {
242 pub interval: Duration,
244 pub real_time: bool,
246 pub algorithms: Vec<String>,
248}
249
250#[derive(Debug, Clone, PartialEq)]
252pub enum RequestResult {
253 Success,
255 Failure,
257 Timeout,
259 Rejected,
261}
262
263impl Default for CircuitBreakerStatsTracker {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl CircuitBreakerStatsTracker {
270 #[must_use]
272 pub fn new() -> Self {
273 Self {
274 request_counters: Arc::new(Mutex::new(RequestCounters::default())),
275 response_times: Arc::new(Mutex::new(ResponseTimeTracker::default())),
276 error_tracker: Arc::new(Mutex::new(ErrorTracker::default())),
277 state_transitions: Arc::new(Mutex::new(StateTransitionTracker::default())),
278 health_metrics: Arc::new(Mutex::new(HealthMetrics::default())),
279 }
280 }
281
282 pub fn record_success(&self, duration: Duration) {
284 let counters = self.request_counters.lock().unwrap();
285 counters.successful.fetch_add(1, Ordering::Relaxed);
286 counters.total.fetch_add(1, Ordering::Relaxed);
287
288 let mut response_times = self.response_times.lock().unwrap();
289 response_times.history.push_back(duration);
290 if response_times.history.len() > response_times.window_size {
291 response_times.history.pop_front();
292 }
293
294 response_times.min = Some(response_times.min.map_or(duration, |min| min.min(duration)));
296 response_times.max = Some(response_times.max.map_or(duration, |max| max.max(duration)));
297
298 response_times.sum += duration;
300 if response_times.history.len() > response_times.window_size {
301 let first_duration = response_times.history[0];
302 response_times.sum -= first_duration;
303 }
304 }
305
306 pub fn record_failure(&self, error: &CircuitBreakerError) {
308 let counters = self.request_counters.lock().unwrap();
309 counters.failed.fetch_add(1, Ordering::Relaxed);
310 counters.total.fetch_add(1, Ordering::Relaxed);
311
312 let mut error_tracker = self.error_tracker.lock().unwrap();
313 let error_type = format!("{error:?}");
314 *error_tracker
315 .error_counts
316 .entry(error_type.clone())
317 .or_insert(0) += 1;
318
319 error_tracker.recent_errors.push_back(ErrorEvent {
320 timestamp: SystemTime::now(),
321 error_type,
322 message: error.to_string(),
323 severity: FaultSeverity::Medium,
324 context: RequestContext {
325 request_id: Uuid::new_v4().to_string(),
326 user_id: None,
327 session_id: None,
328 metadata: HashMap::new(),
329 },
330 });
331
332 if error_tracker.recent_errors.len() > 1000 {
334 error_tracker.recent_errors.pop_front();
335 }
336 }
337
338 pub fn record_rejection(&self) {
340 let counters = self.request_counters.lock().unwrap();
341 counters.rejected.fetch_add(1, Ordering::Relaxed);
342 counters.total.fetch_add(1, Ordering::Relaxed);
343 }
344
345 pub fn record_timeout(&self) {
347 let counters = self.request_counters.lock().unwrap();
348 counters.timeout.fetch_add(1, Ordering::Relaxed);
349 counters.total.fetch_add(1, Ordering::Relaxed);
350 }
351
352 pub fn track_half_open_request(&self) {
354 let counters = self.request_counters.lock().unwrap();
355 counters.half_open_requests.fetch_add(1, Ordering::Relaxed);
356 }
357
358 pub fn track_half_open_success(&self) {
360 let counters = self.request_counters.lock().unwrap();
361 counters.half_open_successes.fetch_add(1, Ordering::Relaxed);
362 }
363
364 pub fn reset_half_open_counters(&self) {
366 let counters = self.request_counters.lock().unwrap();
367 counters.half_open_requests.store(0, Ordering::Relaxed);
368 counters.half_open_successes.store(0, Ordering::Relaxed);
369 }
370
371 pub fn get_half_open_successes(&self) -> u64 {
373 let counters = self.request_counters.lock().unwrap();
374 counters.half_open_successes.load(Ordering::Relaxed)
375 }
376
377 pub fn record_state_transition(
379 &self,
380 from_state: CircuitBreakerState,
381 to_state: CircuitBreakerState,
382 reason: TransitionReason,
383 ) {
384 let mut transitions = self.state_transitions.lock().unwrap();
385
386 let transition = StateTransition {
387 timestamp: SystemTime::now(),
388 from_state,
389 to_state,
390 reason,
391 metadata: HashMap::new(),
392 };
393
394 transitions.transitions.push_back(transition);
395
396 let key = (from_state, to_state);
398 *transitions.transition_counts.entry(key).or_insert(0) += 1;
399
400 if transitions.transitions.len() > 100 {
402 transitions.transitions.pop_front();
403 }
404
405 transitions.current_state_start = SystemTime::now();
407 }
408
409 pub fn update_health_metrics(&self, health_score: f64, availability: f64, reliability: f64) {
411 let mut health = self.health_metrics.lock().unwrap();
412
413 let previous_score = health.health_score;
414 health.health_score = health_score;
415 health.availability = availability;
416 health.reliability = reliability;
417 health.last_health_check = SystemTime::now();
418
419 health.health_trend = if health_score > previous_score + 0.1 {
421 HealthTrend::Improving
422 } else if health_score < previous_score - 0.1 {
423 HealthTrend::Degrading
424 } else if health_score < 0.3 {
425 HealthTrend::Critical
426 } else {
427 HealthTrend::Stable
428 };
429 }
430
431 pub fn reset(&self) {
433 let counters = self.request_counters.lock().unwrap();
434 counters.total.store(0, Ordering::Relaxed);
435 counters.successful.store(0, Ordering::Relaxed);
436 counters.failed.store(0, Ordering::Relaxed);
437 counters.rejected.store(0, Ordering::Relaxed);
438 counters.timeout.store(0, Ordering::Relaxed);
439 counters.in_progress.store(0, Ordering::Relaxed);
440 counters.half_open_requests.store(0, Ordering::Relaxed);
441 counters.half_open_successes.store(0, Ordering::Relaxed);
442
443 let mut response_times = self.response_times.lock().unwrap();
444 response_times.history.clear();
445 response_times.sum = Duration::default();
446 response_times.min = None;
447 response_times.max = None;
448 response_times.percentiles.clear();
449
450 let mut error_tracker = self.error_tracker.lock().unwrap();
451 error_tracker.error_counts.clear();
452 error_tracker.recent_errors.clear();
453 error_tracker.error_rate = 0.0;
454
455 let mut transitions = self.state_transitions.lock().unwrap();
456 transitions.transitions.clear();
457 transitions.transition_counts.clear();
458
459 let mut health = self.health_metrics.lock().unwrap();
460 *health = HealthMetrics::default();
461 }
462
463 #[must_use]
465 pub fn get_stats(&self) -> CircuitBreakerStats {
466 let counters = self.request_counters.lock().unwrap();
467 CircuitBreakerStats {
469 total_requests: counters.total.load(Ordering::Relaxed),
470 successful_requests: counters.successful.load(Ordering::Relaxed),
471 failed_requests: counters.failed.load(Ordering::Relaxed),
472 consecutive_failures: 0, state_changes: 0, last_failure_time: None,
475 last_success_time: None,
476 half_open_requests: counters.half_open_requests.load(Ordering::Relaxed),
477 half_open_successes: counters.half_open_successes.load(Ordering::Relaxed),
478 }
479 }
480
481 #[must_use]
483 pub fn get_request_counters(&self) -> RequestCounters {
484 let counters = self.request_counters.lock().unwrap();
485 RequestCounters {
487 total: AtomicU64::new(counters.total.load(Ordering::Relaxed)),
488 successful: AtomicU64::new(counters.successful.load(Ordering::Relaxed)),
489 failed: AtomicU64::new(counters.failed.load(Ordering::Relaxed)),
490 rejected: AtomicU64::new(counters.rejected.load(Ordering::Relaxed)),
491 timeout: AtomicU64::new(counters.timeout.load(Ordering::Relaxed)),
492 in_progress: AtomicU64::new(counters.in_progress.load(Ordering::Relaxed)),
493 half_open_requests: AtomicU64::new(counters.half_open_requests.load(Ordering::Relaxed)),
494 half_open_successes: AtomicU64::new(
495 counters.half_open_successes.load(Ordering::Relaxed),
496 ),
497 }
498 }
499
500 #[must_use]
502 pub fn get_response_time_stats(&self) -> ResponseTimeStats {
503 let response_times = self.response_times.lock().unwrap();
504
505 let avg = if response_times.history.is_empty() {
506 Duration::default()
507 } else {
508 response_times.sum / response_times.history.len() as u32
509 };
510
511 ResponseTimeStats {
513 average: avg,
514 min: response_times.min,
515 max: response_times.max,
516 percentiles: response_times.percentiles.clone(),
517 sample_count: response_times.history.len(),
518 }
519 }
520
521 #[must_use]
523 pub fn get_error_stats(&self) -> ErrorStats {
524 let error_tracker = self.error_tracker.lock().unwrap();
525
526 ErrorStats {
528 error_counts: error_tracker.error_counts.clone(),
529 error_rate: error_tracker.error_rate,
530 recent_error_count: error_tracker.recent_errors.len(),
531 patterns: error_tracker.error_patterns.clone(),
532 }
533 }
534
535 #[must_use]
537 pub fn get_health_metrics(&self) -> HealthMetrics {
538 let health = self.health_metrics.lock().unwrap();
539 (*health).clone()
540 }
541}
542
543impl CircuitBreakerStatsAggregator {
544 #[must_use]
546 pub fn new(config: AggregationConfig) -> Self {
547 Self {
548 stats: Arc::new(RwLock::new(AggregatedStats::default())),
549 config,
550 }
551 }
552
553 pub fn update_stats(&self, breaker_stats: &[CircuitBreakerStats]) {
555 let mut stats = self.stats.write().unwrap();
556
557 stats.total_breakers = breaker_stats.len() as u64;
558 stats.active_breakers = breaker_stats.len() as u64; let mut total_requests = 0;
562 let mut successful_requests = 0;
563 let mut failed_requests = 0;
564
565 for stat in breaker_stats {
566 total_requests += stat.total_requests;
567 successful_requests += stat.successful_requests;
568 failed_requests += stat.failed_requests;
569 }
570
571 stats
572 .global_requests
573 .total
574 .store(total_requests, Ordering::Relaxed);
575 stats
576 .global_requests
577 .successful
578 .store(successful_requests, Ordering::Relaxed);
579 stats
580 .global_requests
581 .failed
582 .store(failed_requests, Ordering::Relaxed);
583
584 stats.global_health = if total_requests > 0 {
586 successful_requests as f64 / total_requests as f64
587 } else {
588 1.0
589 };
590 }
591
592 #[must_use]
594 pub fn get_stats(&self) -> AggregatedStats {
595 let stats = self.stats.read().unwrap();
596 AggregatedStats {
598 total_breakers: stats.total_breakers,
599 active_breakers: stats.active_breakers,
600 open_breakers: stats.open_breakers,
601 half_open_breakers: stats.half_open_breakers,
602 global_requests: RequestCounters {
603 total: AtomicU64::new(stats.global_requests.total.load(Ordering::Relaxed)),
604 successful: AtomicU64::new(
605 stats.global_requests.successful.load(Ordering::Relaxed),
606 ),
607 failed: AtomicU64::new(stats.global_requests.failed.load(Ordering::Relaxed)),
608 rejected: AtomicU64::new(stats.global_requests.rejected.load(Ordering::Relaxed)),
609 timeout: AtomicU64::new(stats.global_requests.timeout.load(Ordering::Relaxed)),
610 in_progress: AtomicU64::new(
611 stats.global_requests.in_progress.load(Ordering::Relaxed),
612 ),
613 half_open_requests: AtomicU64::new(
614 stats
615 .global_requests
616 .half_open_requests
617 .load(Ordering::Relaxed),
618 ),
619 half_open_successes: AtomicU64::new(
620 stats
621 .global_requests
622 .half_open_successes
623 .load(Ordering::Relaxed),
624 ),
625 },
626 global_health: stats.global_health,
627 }
628 }
629
630 pub fn reset(&self) {
632 let mut stats = self.stats.write().unwrap();
633 *stats = AggregatedStats::default();
634 }
635}
636
637#[derive(Debug, Clone)]
639pub struct ResponseTimeStats {
640 pub average: Duration,
642 pub min: Option<Duration>,
644 pub max: Option<Duration>,
646 pub percentiles: HashMap<u8, Duration>,
648 pub sample_count: usize,
650}
651
652#[derive(Debug, Clone)]
654pub struct ErrorStats {
655 pub error_counts: HashMap<String, u64>,
657 pub error_rate: f64,
659 pub recent_error_count: usize,
661 pub patterns: Vec<ErrorPattern>,
663}
664
665impl Default for ResponseTimeTracker {
666 fn default() -> Self {
667 Self {
668 history: VecDeque::new(),
669 window_size: 1000, sum: Duration::default(),
671 min: None,
672 max: None,
673 percentiles: HashMap::new(),
674 }
675 }
676}
677
678impl Default for AggregationConfig {
679 fn default() -> Self {
680 Self {
681 interval: Duration::from_secs(60),
682 real_time: true,
683 algorithms: vec!["simple".to_string()],
684 }
685 }
686}
687
688impl Default for HealthMetrics {
689 fn default() -> Self {
690 Self {
691 health_score: 1.0,
692 availability: 1.0,
693 reliability: 1.0,
694 performance_score: 1.0,
695 last_health_check: SystemTime::now(),
696 health_trend: HealthTrend::Unknown,
697 }
698 }
699}
700
701impl Default for StateTransitionTracker {
702 fn default() -> Self {
703 Self {
704 transitions: VecDeque::new(),
705 current_state_start: SystemTime::now(),
706 state_durations: HashMap::new(),
707 transition_counts: HashMap::new(),
708 }
709 }
710}