decrust_core/
circuit_breaker.rs

1/* src/circuit_breaker.rs */
2#![warn(missing_docs)]
3//! **Brief:** Circuit breaker implementation for resilience.
4// ~=####====A===r===c===M===o===o===n====S===t===u===d===i===o===s====X|0|$>
5//! + [Error Handling Framework]
6//!  - [Circuit Breaker Pattern]
7//!  - [Fault Tolerance]
8//!  - [Service Resilience]
9//!  - [Adaptive Thresholds]
10//!  - [Performance Monitoring]
11// ~=####====A===r===c===M===o===o===n====S===t===u===d===i===o===s====X|0|$>
12// **GitHub:** [ArcMoon Studios](https://github.com/arcmoonstudios)
13// **Copyright:** (c) 2025 ArcMoon Studios
14// **Author:** Lord Xyn
15// **License:** Business Source License 1.1 (BSL-1.1)
16// **License File:** /LICENSE
17// **License Terms:** Non-production use only; commercial/production use requires a paid license.
18// **Change Date:** 2029-05-25 | **Change License:** GPL v3
19// **Contact:** LordXyn@proton.me
20
21//! This module provides a CircuitBreaker struct that helps protect the system
22//! from cascading failures when interacting with external services or performing
23//! operations prone to repeated errors.
24
25use super::backtrace::DecrustBacktrace as Backtrace;
26use super::{DecrustError, Result};
27use std::collections::VecDeque;
28use std::fmt;
29use std::sync::{Arc, Mutex, RwLock};
30use std::time::{Duration, Instant, SystemTime};
31use tracing::info;
32
33/// A wrapper for types that don't implement Debug
34pub struct DebugIgnore<T: ?Sized>(pub T);
35
36impl<T: ?Sized> fmt::Debug for DebugIgnore<T> {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "<function>")
39    }
40}
41
42impl<T: Clone> Clone for DebugIgnore<T> {
43    fn clone(&self) -> Self {
44        DebugIgnore(self.0.clone())
45    }
46}
47
48#[cfg(feature = "rand")]
49#[allow(unused_imports)]
50use rand::Rng;
51#[cfg(feature = "tokio")]
52use tokio::time;
53
54/// Represents the state of the circuit breaker.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
56pub enum CircuitBreakerState {
57    /// The circuit is closed, operations are allowed.
58    #[default]
59    Closed,
60    /// The circuit is open, operations are rejected immediately.
61    Open,
62    /// The circuit is partially open, allowing a limited number of test operations.
63    HalfOpen,
64}
65
66impl fmt::Display for CircuitBreakerState {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "{:?}", self)
69    }
70}
71
72/// Type of operation outcome.
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum CircuitOperationType {
75    /// Operation completed successfully
76    Success,
77    /// Operation failed with an error
78    Failure,
79    /// Operation was rejected by the circuit breaker (e.g., when Open or HalfOpen limit reached)
80    Rejected,
81    /// Operation timed out
82    Timeout,
83}
84
85/// Represents an event of state transition
86#[derive(Debug, Clone)]
87pub struct CircuitTransitionEvent {
88    /// The state the circuit breaker is transitioning from
89    pub from_state: CircuitBreakerState,
90    /// The state the circuit breaker is transitioning to
91    pub to_state: CircuitBreakerState,
92    /// When the transition occurred
93    pub timestamp: SystemTime,
94    /// The reason for the state transition
95    pub reason: String,
96}
97
98/// Observer trait for circuit breaker events.
99///
100/// Implement this trait to react to state changes, operation results,
101/// and other significant events from the circuit breaker.
102pub trait CircuitBreakerObserver: Send + Sync {
103    /// Called when the circuit breaker's state changes.
104    fn on_state_change(&self, name: &str, event: &CircuitTransitionEvent);
105    /// Called before an operation is attempted (if not rejected immediately).
106    fn on_operation_attempt(&self, name: &str, state: CircuitBreakerState);
107    /// Called after an operation completes or is rejected/timed out.
108    fn on_operation_result(
109        &self,
110        name: &str,
111        op_type: CircuitOperationType,
112        duration: Duration,
113        error: Option<&DecrustError>,
114    );
115    /// Called when the circuit breaker is manually reset.
116    fn on_reset(&self, name: &str);
117}
118
119/// Metrics collected by the circuit breaker
120#[derive(Debug, Clone, Default)]
121pub struct CircuitMetrics {
122    /// Current state of the circuit breaker
123    pub state: CircuitBreakerState,
124    /// Total number of requests processed by the circuit breaker
125    pub total_requests: u64,
126    /// Number of successful requests
127    pub successful_requests: u64,
128    /// Number of failed requests
129    pub failed_requests: u64,
130    /// Number of requests rejected due to circuit breaker being open
131    pub rejected_requests: u64,
132    /// Number of requests that timed out
133    pub timeout_requests: u64,
134    /// Current count of consecutive failures
135    pub consecutive_failures: u32,
136    /// Current count of consecutive successes
137    pub consecutive_successes: u32,
138    /// Timestamp of the last error that occurred
139    pub last_error_timestamp: Option<SystemTime>,
140    /// Timestamp of the last state transition
141    pub last_transition_timestamp: Option<SystemTime>,
142    /// Current failure rate calculated over the sliding window (0.0 to 1.0)
143    pub failure_rate_in_window: Option<f64>,
144    /// Current rate of slow calls calculated over the sliding window (0.0 to 1.0)
145    pub slow_call_rate_in_window: Option<f64>,
146}
147
148/// Type alias for error predicate function
149pub type ErrorPredicate = Arc<dyn Fn(&DecrustError) -> bool + Send + Sync>;
150
151/// Configuration for the CircuitBreaker.
152///
153/// Defines thresholds and timeouts that control the behavior of the circuit breaker.
154#[derive(Clone)]
155pub struct CircuitBreakerConfig {
156    /// The number of consecutive failures after which the circuit opens.
157    pub failure_threshold: usize,
158    /// The failure rate (0.0 to 1.0) within the sliding window that causes the circuit to open.
159    pub failure_rate_threshold: f64,
160    /// The minimum number of requests in the sliding window before the failure rate is considered.
161    pub minimum_request_threshold_for_rate: usize,
162    /// The number of consecutive successes required in HalfOpen state to transition to Closed.
163    pub success_threshold_to_close: usize,
164    /// The duration the circuit stays Open before transitioning to HalfOpen.
165    pub reset_timeout: Duration,
166    /// The maximum number of operations allowed to execute concurrently when in HalfOpen state.
167    pub half_open_max_concurrent_operations: usize,
168    /// Optional timeout for individual operations executed through the circuit breaker.
169    pub operation_timeout: Option<Duration>,
170    /// The size of the sliding window used for calculating failure rates.
171    pub sliding_window_size: usize,
172    /// An optional predicate to determine if a specific `DecrustError` should be considered a failure.
173    /// If `None`, all `Err` results are considered failures.
174    pub error_predicate: Option<ErrorPredicate>,
175    /// The size of the history window for detailed metrics (not fully implemented in this version).
176    pub metrics_window_size: usize, // Currently used for result_window and slow_call_window size logic
177    /// Whether to track detailed metrics.
178    pub track_metrics: bool,
179    /// Threshold for an operation to be considered a "slow call".
180    pub slow_call_duration_threshold: Option<Duration>,
181    /// Rate of slow calls (0.0 to 1.0) in the window that can cause the circuit to open.
182    pub slow_call_rate_threshold: Option<f64>,
183    /// Number of consecutive failures before opening the circuit breaker.
184    pub circuit_breaker_threshold: u32,
185    /// Duration the circuit breaker stays open after tripping.
186    pub circuit_breaker_cooldown: Duration,
187}
188
189impl Default for CircuitBreakerConfig {
190    fn default() -> Self {
191        Self {
192            failure_threshold: 5,
193            failure_rate_threshold: 0.5,
194            minimum_request_threshold_for_rate: 10,
195            success_threshold_to_close: 3,
196            reset_timeout: Duration::from_secs(30),
197            half_open_max_concurrent_operations: 1,
198            operation_timeout: Some(Duration::from_secs(5)),
199            sliding_window_size: 100,
200            error_predicate: None,
201            metrics_window_size: 100, // This could influence window sizes if not for fixed `sliding_window_size`
202            track_metrics: true,
203            slow_call_duration_threshold: None, // e.g., Some(Duration::from_millis(500))
204            slow_call_rate_threshold: None,     // e.g., Some(0.3) for 30% slow calls
205            circuit_breaker_threshold: 3,       // Default to 3 consecutive failures
206            circuit_breaker_cooldown: Duration::from_secs(60), // Default to 60 seconds cooldown
207        }
208    }
209}
210
211impl fmt::Debug for CircuitBreakerConfig {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        f.debug_struct("CircuitBreakerConfig")
214            .field("failure_threshold", &self.failure_threshold)
215            .field("failure_rate_threshold", &self.failure_rate_threshold)
216            .field(
217                "minimum_request_threshold_for_rate",
218                &self.minimum_request_threshold_for_rate,
219            )
220            .field(
221                "success_threshold_to_close",
222                &self.success_threshold_to_close,
223            )
224            .field("reset_timeout", &self.reset_timeout)
225            .field(
226                "half_open_max_concurrent_operations",
227                &self.half_open_max_concurrent_operations,
228            )
229            .field("operation_timeout", &self.operation_timeout)
230            .field("sliding_window_size", &self.sliding_window_size)
231            .field(
232                "error_predicate",
233                &if self.error_predicate.is_some() {
234                    "Some(<function>)"
235                } else {
236                    "None"
237                },
238            )
239            .field("metrics_window_size", &self.metrics_window_size)
240            .field("track_metrics", &self.track_metrics)
241            .field(
242                "slow_call_duration_threshold",
243                &self.slow_call_duration_threshold,
244            )
245            .field("slow_call_rate_threshold", &self.slow_call_rate_threshold)
246            .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
247            .field("circuit_breaker_cooldown", &self.circuit_breaker_cooldown)
248            .finish()
249    }
250}
251
252#[derive(Debug)] // Added Debug derive for InnerState
253struct InnerState {
254    state: CircuitBreakerState,
255    opened_at: Option<Instant>,
256    half_open_entered_at: Option<Instant>,
257    consecutive_failures: usize,
258    consecutive_successes: usize,
259    half_open_concurrency_count: usize,
260    results_window: VecDeque<bool>, // true for success, false for failure
261    slow_call_window: VecDeque<bool>, // true if call was slow
262    metrics: CircuitMetrics,
263}
264
265impl Default for InnerState {
266    fn default() -> Self {
267        Self {
268            state: CircuitBreakerState::Closed,
269            opened_at: None,
270            half_open_entered_at: None,
271            consecutive_failures: 0,
272            consecutive_successes: 0,
273            half_open_concurrency_count: 0,
274            results_window: VecDeque::with_capacity(100),
275            slow_call_window: VecDeque::with_capacity(100),
276            metrics: CircuitMetrics::default(),
277        }
278    }
279}
280
281/// A circuit breaker implementation to prevent cascading failures.
282pub struct CircuitBreaker {
283    name: String,
284    config: CircuitBreakerConfig,
285    inner: RwLock<InnerState>,
286    observers: Mutex<Vec<Arc<dyn CircuitBreakerObserver>>>,
287}
288
289impl CircuitBreaker {
290    /// Creates a new CircuitBreaker instance
291    pub fn new(name: impl Into<String>, config: CircuitBreakerConfig) -> Arc<Self> {
292        Arc::new(Self {
293            name: name.into(),
294            config,
295            inner: RwLock::new(InnerState::default()),
296            observers: Mutex::new(Vec::new()),
297        })
298    }
299
300    /// Add an observer to the circuit breaker
301    pub fn add_observer(&self, observer: Arc<dyn CircuitBreakerObserver>) {
302        let mut observers = self.observers.lock().unwrap();
303        observers.push(observer);
304    }
305
306    /// Get the current state of the circuit breaker
307    pub fn state(&self) -> CircuitBreakerState {
308        let inner = self.inner.read().unwrap();
309        inner.state
310    }
311
312    /// Get the current metrics of the circuit breaker
313    pub fn metrics(&self) -> CircuitMetrics {
314        let inner = self.inner.read().unwrap();
315        inner.metrics.clone()
316    }
317
318    /// Trip the circuit breaker manually
319    pub fn trip(&self) {
320        let mut inner = self.inner.write().unwrap();
321        let prev_state = inner.state;
322        inner.state = CircuitBreakerState::Open;
323        inner.opened_at = Some(Instant::now());
324        inner.consecutive_failures = self.config.failure_threshold;
325        inner.consecutive_successes = 0;
326
327        let event = CircuitTransitionEvent {
328            from_state: prev_state,
329            to_state: CircuitBreakerState::Open,
330            timestamp: SystemTime::now(),
331            reason: "Manual trip".to_string(),
332        };
333
334        // Update metrics
335        inner.metrics.state = CircuitBreakerState::Open;
336        inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
337        inner.metrics.consecutive_successes = 0;
338        inner.metrics.last_transition_timestamp = Some(SystemTime::now());
339
340        // Drop the lock before calling observers
341        drop(inner);
342
343        // Notify observers
344        self.notify_state_change(&event);
345    }
346
347    /// Reset the circuit breaker to closed state
348    pub fn reset(&self) {
349        let mut inner = self.inner.write().unwrap();
350        let prev_state = inner.state;
351        inner.state = CircuitBreakerState::Closed;
352        inner.opened_at = None;
353        inner.half_open_entered_at = None;
354        inner.consecutive_failures = 0;
355        inner.consecutive_successes = 0;
356        inner.half_open_concurrency_count = 0;
357
358        // Update metrics
359        inner.metrics.state = CircuitBreakerState::Closed;
360        inner.metrics.consecutive_failures = 0;
361        inner.metrics.consecutive_successes = 0;
362        inner.metrics.last_transition_timestamp = Some(SystemTime::now());
363
364        // Clear windows
365        inner.results_window.clear();
366        inner.slow_call_window.clear();
367
368        let event = CircuitTransitionEvent {
369            from_state: prev_state,
370            to_state: CircuitBreakerState::Closed,
371            timestamp: SystemTime::now(),
372            reason: "Manual reset".to_string(),
373        };
374
375        // Drop the lock before calling observers
376        drop(inner);
377
378        // Notify observers
379        self.notify_state_change(&event);
380        self.notify_reset();
381    }
382
383    /// Execute an operation through the circuit breaker
384    #[cfg(not(feature = "std-thread"))]
385    pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
386    where
387        F: FnOnce() -> Result<Ret>,
388    {
389        let start_time = Instant::now();
390        let state = self.state();
391
392        self.notify_operation_attempt(state);
393
394        match state {
395            CircuitBreakerState::Open => {
396                // Check if reset timeout has elapsed
397                let inner = self.inner.read().unwrap();
398                let should_transition = if let Some(opened_at) = inner.opened_at {
399                    opened_at.elapsed() >= self.config.reset_timeout
400                } else {
401                    false
402                };
403                drop(inner);
404
405                if should_transition {
406                    self.transition_to_half_open("Reset timeout elapsed");
407                    // Continue with half-open logic
408                    self.execute_half_open(operation, start_time)
409                } else {
410                    // Still open, reject the operation
411                    self.record_rejected();
412                    Err(DecrustError::CircuitBreakerOpen {
413                        name: self.name.clone(),
414                        retry_after: Some(
415                            self.config
416                                .reset_timeout
417                                .checked_sub(
418                                    self.inner.read().unwrap().opened_at.unwrap().elapsed(),
419                                )
420                                .unwrap_or_default(),
421                        ),
422                        failure_count: None,
423                        last_error: None,
424                        backtrace: Backtrace::generate(),
425                    })
426                }
427            }
428            CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
429            CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
430        }
431    }
432
433    /// Execute an operation through the circuit breaker
434    #[cfg(feature = "std-thread")]
435    pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
436    where
437        F: FnOnce() -> Result<Ret> + Send + 'static,
438        Ret: Send + 'static,
439    {
440        let start_time = Instant::now();
441        let state = self.state();
442
443        self.notify_operation_attempt(state);
444
445        match state {
446            CircuitBreakerState::Open => {
447                // Check if reset timeout has elapsed
448                let inner = self.inner.read().unwrap();
449                let should_transition = if let Some(opened_at) = inner.opened_at {
450                    opened_at.elapsed() >= self.config.reset_timeout
451                } else {
452                    false
453                };
454                drop(inner);
455
456                if should_transition {
457                    self.transition_to_half_open("Reset timeout elapsed");
458                    // Continue with half-open logic
459                    self.execute_half_open(operation, start_time)
460                } else {
461                    // Still open, reject the operation
462                    self.record_rejected();
463                    Err(DecrustError::CircuitBreakerOpen {
464                        name: self.name.clone(),
465                        retry_after: Some(
466                            self.config
467                                .reset_timeout
468                                .checked_sub(
469                                    self.inner.read().unwrap().opened_at.unwrap().elapsed(),
470                                )
471                                .unwrap_or_default(),
472                        ),
473                        failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
474                        last_error: None, // InnerState doesn't track last_error
475                        backtrace: Backtrace::generate(),
476                    })
477                }
478            }
479            CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
480            CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
481        }
482    }
483
484    /// Execute an async operation through the circuit breaker
485    #[cfg(feature = "tokio")]
486    pub async fn execute_async<F, Fut, Ret>(&self, operation: F) -> Result<Ret>
487    where
488        F: FnOnce() -> Fut,
489        Fut: std::future::Future<Output = Result<Ret>>,
490    {
491        let start_time = Instant::now();
492        let state = self.state();
493
494        self.notify_operation_attempt(state);
495
496        match state {
497            CircuitBreakerState::Open => {
498                // Check if reset timeout has elapsed
499                let inner = self.inner.read().unwrap();
500                let should_transition = if let Some(opened_at) = inner.opened_at {
501                    opened_at.elapsed() >= self.config.reset_timeout
502                } else {
503                    false
504                };
505                drop(inner);
506
507                if should_transition {
508                    self.transition_to_half_open("Reset timeout elapsed");
509                    // Continue with half-open logic
510                    self.execute_half_open_async(operation, start_time).await
511                } else {
512                    // Still open, reject the operation
513                    self.record_rejected();
514                    Err(DecrustError::CircuitBreakerOpen {
515                        name: self.name.clone(),
516                        retry_after: Some(
517                            self.config
518                                .reset_timeout
519                                .checked_sub(
520                                    self.inner.read().unwrap().opened_at.unwrap().elapsed(),
521                                )
522                                .unwrap_or_default(),
523                        ),
524                        failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
525                        last_error: None, // InnerState doesn't track last_error
526                        backtrace: Backtrace::generate(),
527                    })
528                }
529            }
530            CircuitBreakerState::HalfOpen => {
531                self.execute_half_open_async(operation, start_time).await
532            }
533            CircuitBreakerState::Closed => self.execute_closed_async(operation, start_time).await,
534        }
535    }
536
537    // Private helper methods
538
539    // Execute operation in Closed state
540    #[cfg(not(feature = "std-thread"))]
541    fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
542    where
543        F: FnOnce() -> Result<Ret>,
544    {
545        let result = if let Some(timeout) = self.config.operation_timeout {
546            self.execute_with_timeout(operation, timeout)
547        } else {
548            operation()
549        };
550
551        let duration = start_time.elapsed();
552
553        match &result {
554            Ok(_) => {
555                self.record_success(duration);
556            }
557            Err(e) => {
558                if self.should_count_as_failure(e) {
559                    self.record_failure(e, duration);
560
561                    // Check if we need to open the circuit
562                    if self.should_open_circuit() {
563                        self.transition_to_open("Failure threshold reached");
564                    }
565                } else {
566                    // Error not counted as failure for circuit breaking
567                    self.record_success(duration);
568                }
569            }
570        }
571
572        result
573    }
574
575    #[cfg(feature = "std-thread")]
576    fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
577    where
578        F: FnOnce() -> Result<Ret> + Send + 'static,
579        Ret: Send + 'static,
580    {
581        let result = if let Some(timeout) = self.config.operation_timeout {
582            self.execute_with_timeout(operation, timeout)
583        } else {
584            operation()
585        };
586
587        let duration = start_time.elapsed();
588
589        match &result {
590            Ok(_) => {
591                self.record_success(duration);
592            }
593            Err(e) => {
594                if self.should_count_as_failure(e) {
595                    self.record_failure(e, duration);
596
597                    // Check if we need to open the circuit
598                    if self.should_open_circuit() {
599                        self.transition_to_open("Failure threshold reached");
600                    }
601                } else {
602                    // Error not counted as failure for circuit breaking
603                    self.record_success(duration);
604                }
605            }
606        }
607
608        result
609    }
610
611    // Execute operation in HalfOpen state
612    #[cfg(not(feature = "std-thread"))]
613    fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
614    where
615        F: FnOnce() -> Result<Ret>,
616    {
617        // Check if we can proceed with the operation
618        {
619            let mut inner = self.inner.write().unwrap();
620            if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
621            {
622                // Too many concurrent operations in half-open state
623                self.record_rejected();
624                return Err(DecrustError::CircuitBreakerOpen {
625                    name: self.name.clone(),
626                    retry_after: Some(Duration::from_millis(100)),
627                    failure_count: None,
628                    last_error: None,
629                    backtrace: Backtrace::generate(),
630                });
631            }
632
633            // Increment concurrency count
634            inner.half_open_concurrency_count += 1;
635        }
636
637        // Execute the operation
638        let result = if let Some(timeout) = self.config.operation_timeout {
639            self.execute_with_timeout(operation, timeout)
640        } else {
641            operation()
642        };
643
644        let duration = start_time.elapsed();
645
646        // Decrement concurrency count
647        {
648            let mut inner = self.inner.write().unwrap();
649            inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
650        }
651
652        match &result {
653            Ok(_) => {
654                self.record_success(duration);
655
656                // Check if we can close the circuit
657                let close_circuit = {
658                    let inner = self.inner.read().unwrap();
659                    inner.consecutive_successes >= self.config.success_threshold_to_close
660                };
661
662                if close_circuit {
663                    self.transition_to_closed("Success threshold reached");
664                }
665            }
666            Err(e) => {
667                if self.should_count_as_failure(e) {
668                    self.record_failure(e, duration);
669
670                    // Any failure in half-open should open the circuit again
671                    self.transition_to_open("Failure in half-open state");
672                } else {
673                    // Error not counted as failure for circuit breaking
674                    self.record_success(duration);
675                }
676            }
677        }
678
679        result
680    }
681
682    #[cfg(feature = "std-thread")]
683    fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
684    where
685        F: FnOnce() -> Result<Ret> + Send + 'static,
686        Ret: Send + 'static,
687    {
688        // Check if we can proceed with the operation
689        {
690            let mut inner = self.inner.write().unwrap();
691            if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
692            {
693                // Too many concurrent operations in half-open state
694                self.record_rejected();
695                return Err(DecrustError::CircuitBreakerOpen {
696                    name: self.name.clone(),
697                    retry_after: Some(Duration::from_millis(100)),
698                    failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
699                    last_error: None, // InnerState doesn't track last_error
700                    backtrace: Backtrace::generate(),
701                });
702            }
703
704            // Increment concurrency count
705            inner.half_open_concurrency_count += 1;
706        }
707
708        // Execute the operation
709        let result = if let Some(timeout) = self.config.operation_timeout {
710            self.execute_with_timeout(operation, timeout)
711        } else {
712            operation()
713        };
714
715        let duration = start_time.elapsed();
716
717        // Decrement concurrency count
718        {
719            let mut inner = self.inner.write().unwrap();
720            inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
721        }
722
723        match &result {
724            Ok(_) => {
725                self.record_success(duration);
726
727                // Check if we can close the circuit
728                let close_circuit = {
729                    let inner = self.inner.read().unwrap();
730                    inner.consecutive_successes >= self.config.success_threshold_to_close
731                };
732
733                if close_circuit {
734                    self.transition_to_closed("Success threshold reached");
735                }
736            }
737            Err(e) => {
738                if self.should_count_as_failure(e) {
739                    self.record_failure(e, duration);
740
741                    // Any failure in half-open should open the circuit again
742                    self.transition_to_open("Failure in half-open state");
743                } else {
744                    // Error not counted as failure for circuit breaking
745                    self.record_success(duration);
746                }
747            }
748        }
749
750        result
751    }
752
753    // Async versions
754
755    #[cfg(feature = "tokio")]
756    async fn execute_closed_async<F, Fut, Ret>(
757        &self,
758        operation: F,
759        start_time: Instant,
760    ) -> Result<Ret>
761    where
762        F: FnOnce() -> Fut,
763        Fut: std::future::Future<Output = Result<Ret>>,
764    {
765        let result = if let Some(timeout) = self.config.operation_timeout {
766            self.execute_with_timeout_async(operation, timeout).await
767        } else {
768            operation().await
769        };
770
771        let duration = start_time.elapsed();
772
773        match &result {
774            Ok(_) => {
775                self.record_success(duration);
776            }
777            Err(e) => {
778                if self.should_count_as_failure(e) {
779                    self.record_failure(e, duration);
780
781                    // Check if we need to open the circuit
782                    if self.should_open_circuit() {
783                        self.transition_to_open("Failure threshold reached");
784                    }
785                } else {
786                    // Error not counted as failure for circuit breaking
787                    self.record_success(duration);
788                }
789            }
790        }
791
792        result
793    }
794
795    #[cfg(feature = "tokio")]
796    async fn execute_half_open_async<F, Fut, Ret>(
797        &self,
798        operation: F,
799        start_time: Instant,
800    ) -> Result<Ret>
801    where
802        F: FnOnce() -> Fut,
803        Fut: std::future::Future<Output = Result<Ret>>,
804    {
805        // Check if we can proceed with the operation
806        {
807            let mut inner = self.inner.write().unwrap();
808            if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
809            {
810                // Too many concurrent operations in half-open state
811                self.record_rejected();
812                return Err(DecrustError::CircuitBreakerOpen {
813                    name: self.name.clone(),
814                    retry_after: Some(Duration::from_millis(100)),
815                    failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
816                    last_error: None, // InnerState doesn't track last_error
817                    backtrace: Backtrace::generate(),
818                });
819            }
820
821            // Increment concurrency count
822            inner.half_open_concurrency_count += 1;
823        }
824
825        // Execute the operation
826        let result = if let Some(timeout) = self.config.operation_timeout {
827            self.execute_with_timeout_async(operation, timeout).await
828        } else {
829            operation().await
830        };
831
832        let duration = start_time.elapsed();
833
834        // Decrement concurrency count
835        {
836            let mut inner = self.inner.write().unwrap();
837            inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
838        }
839
840        match &result {
841            Ok(_) => {
842                self.record_success(duration);
843
844                // Check if we can close the circuit
845                let close_circuit = {
846                    let inner = self.inner.read().unwrap();
847                    inner.consecutive_successes >= self.config.success_threshold_to_close
848                };
849
850                if close_circuit {
851                    self.transition_to_closed("Success threshold reached");
852                }
853            }
854            Err(e) => {
855                if self.should_count_as_failure(e) {
856                    self.record_failure(e, duration);
857
858                    // Any failure in half-open should open the circuit again
859                    self.transition_to_open("Failure in half-open state");
860                } else {
861                    // Error not counted as failure for circuit breaking
862                    self.record_success(duration);
863                }
864            }
865        }
866
867        result
868    }
869
870    // Timeout helpers
871
872    // Non-threaded timeout implementation
873    #[cfg(not(feature = "std-thread"))]
874    fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
875    where
876        F: FnOnce() -> Result<Ret>,
877    {
878        // Fallback implementation without threads
879        let start = Instant::now();
880        let result = operation();
881        if start.elapsed() > timeout {
882            self.record_timeout();
883            Err(DecrustError::Timeout {
884                operation: format!("Operation in circuit breaker '{}'", self.name),
885                duration: timeout,
886                backtrace: Backtrace::generate(),
887            })
888        } else {
889            result
890        }
891    }
892
893    // Threaded timeout implementation
894    #[cfg(feature = "std-thread")]
895    fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
896    where
897        F: FnOnce() -> Result<Ret> + Send + 'static,
898        Ret: Send + 'static,
899    {
900        use std::sync::mpsc;
901        use std::thread;
902
903        let (tx, rx) = mpsc::channel();
904
905        let handle = thread::spawn(move || {
906            let result = operation();
907            let _ = tx.send(result);
908        });
909
910        match rx.recv_timeout(timeout) {
911            Ok(result) => {
912                // Operation completed within timeout
913                let _ = handle.join();
914                result
915            }
916            Err(_) => {
917                // Operation timed out
918                self.record_timeout();
919                Err(DecrustError::Timeout {
920                    operation: format!("Operation in circuit breaker '{}'", self.name),
921                    duration: timeout,
922                    backtrace: Backtrace::generate(),
923                })
924            }
925        }
926    }
927
928    #[cfg(feature = "tokio")]
929    async fn execute_with_timeout_async<F, Fut, Ret>(
930        &self,
931        operation: F,
932        timeout: Duration,
933    ) -> Result<Ret>
934    where
935        F: FnOnce() -> Fut,
936        Fut: std::future::Future<Output = Result<Ret>>,
937    {
938        match time::timeout(timeout, operation()).await {
939            Ok(result) => result,
940            Err(_) => {
941                self.record_timeout();
942                Err(DecrustError::Timeout {
943                    operation: format!("Operation in circuit breaker '{}'", self.name),
944                    duration: timeout,
945                    backtrace: Backtrace::generate(),
946                })
947            }
948        }
949    }
950
951    // State transition helpers
952
953    fn transition_to_open(&self, reason: &str) {
954        let mut inner = self.inner.write().unwrap();
955        let prev_state = inner.state;
956        inner.state = CircuitBreakerState::Open;
957        inner.opened_at = Some(Instant::now());
958        inner.consecutive_successes = 0;
959
960        let event = CircuitTransitionEvent {
961            from_state: prev_state,
962            to_state: CircuitBreakerState::Open,
963            timestamp: SystemTime::now(),
964            reason: reason.to_string(),
965        };
966
967        // Update metrics
968        inner.metrics.state = CircuitBreakerState::Open;
969        inner.metrics.last_transition_timestamp = Some(SystemTime::now());
970
971        // Drop the lock before calling observers
972        drop(inner);
973
974        info!(
975            "Circuit breaker '{}' transitioning to Open: {}",
976            self.name, reason
977        );
978        self.notify_state_change(&event);
979    }
980
981    fn transition_to_half_open(&self, reason: &str) {
982        let mut inner = self.inner.write().unwrap();
983        let prev_state = inner.state;
984        inner.state = CircuitBreakerState::HalfOpen;
985        inner.half_open_entered_at = Some(Instant::now());
986        inner.consecutive_successes = 0;
987        inner.half_open_concurrency_count = 0;
988
989        let event = CircuitTransitionEvent {
990            from_state: prev_state,
991            to_state: CircuitBreakerState::HalfOpen,
992            timestamp: SystemTime::now(),
993            reason: reason.to_string(),
994        };
995
996        // Update metrics
997        inner.metrics.state = CircuitBreakerState::HalfOpen;
998        inner.metrics.last_transition_timestamp = Some(SystemTime::now());
999
1000        // Drop the lock before calling observers
1001        drop(inner);
1002
1003        info!(
1004            "Circuit breaker '{}' transitioning to HalfOpen: {}",
1005            self.name, reason
1006        );
1007        self.notify_state_change(&event);
1008    }
1009
1010    fn transition_to_closed(&self, reason: &str) {
1011        let mut inner = self.inner.write().unwrap();
1012        let prev_state = inner.state;
1013        inner.state = CircuitBreakerState::Closed;
1014        inner.opened_at = None;
1015        inner.half_open_entered_at = None;
1016        inner.consecutive_failures = 0;
1017
1018        let event = CircuitTransitionEvent {
1019            from_state: prev_state,
1020            to_state: CircuitBreakerState::Closed,
1021            timestamp: SystemTime::now(),
1022            reason: reason.to_string(),
1023        };
1024
1025        // Update metrics
1026        inner.metrics.state = CircuitBreakerState::Closed;
1027        inner.metrics.last_transition_timestamp = Some(SystemTime::now());
1028
1029        // Drop the lock before calling observers
1030        drop(inner);
1031
1032        info!(
1033            "Circuit breaker '{}' transitioning to Closed: {}",
1034            self.name, reason
1035        );
1036        self.notify_state_change(&event);
1037    }
1038
1039    // Result recording helpers
1040
1041    fn record_success(&self, duration: Duration) {
1042        let mut inner = self.inner.write().unwrap();
1043        inner.consecutive_successes += 1;
1044        inner.consecutive_failures = 0;
1045
1046        // Update sliding window
1047        if inner.results_window.len() >= self.config.sliding_window_size {
1048            inner.results_window.pop_front();
1049        }
1050        inner.results_window.push_back(true);
1051
1052        // Check if the call was slow
1053        let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
1054            duration >= threshold
1055        } else {
1056            false
1057        };
1058
1059        // Update slow call window
1060        if inner.slow_call_window.len() >= self.config.sliding_window_size {
1061            inner.slow_call_window.pop_front();
1062        }
1063        inner.slow_call_window.push_back(was_slow);
1064
1065        // Update metrics
1066        inner.metrics.total_requests += 1;
1067        inner.metrics.successful_requests += 1;
1068        inner.metrics.consecutive_successes = inner.consecutive_successes as u32;
1069        inner.metrics.consecutive_failures = 0;
1070
1071        // Calculate rates
1072        self.update_rates(&mut inner);
1073
1074        drop(inner);
1075
1076        self.notify_operation_result(CircuitOperationType::Success, duration, None);
1077    }
1078
1079    fn record_failure(&self, error: &DecrustError, duration: Duration) {
1080        let mut inner = self.inner.write().unwrap();
1081        inner.consecutive_failures += 1;
1082        inner.consecutive_successes = 0;
1083
1084        // Update sliding window
1085        if inner.results_window.len() >= self.config.sliding_window_size {
1086            inner.results_window.pop_front();
1087        }
1088        inner.results_window.push_back(false);
1089
1090        // Check if the call was slow (although it failed)
1091        let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
1092            duration >= threshold
1093        } else {
1094            false
1095        };
1096
1097        // Update slow call window
1098        if inner.slow_call_window.len() >= self.config.sliding_window_size {
1099            inner.slow_call_window.pop_front();
1100        }
1101        inner.slow_call_window.push_back(was_slow);
1102
1103        // Update metrics
1104        inner.metrics.total_requests += 1;
1105        inner.metrics.failed_requests += 1;
1106        inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
1107        inner.metrics.consecutive_successes = 0;
1108        inner.metrics.last_error_timestamp = Some(SystemTime::now());
1109
1110        // Calculate rates
1111        self.update_rates(&mut inner);
1112
1113        let error_clone = error.clone(); // This requires Clone for DecrustError
1114        drop(inner);
1115
1116        self.notify_operation_result(CircuitOperationType::Failure, duration, Some(&error_clone));
1117    }
1118
1119    fn record_rejected(&self) {
1120        let mut inner = self.inner.write().unwrap();
1121        inner.metrics.total_requests += 1;
1122        inner.metrics.rejected_requests += 1;
1123        drop(inner);
1124
1125        // Zero duration since operation was rejected
1126        self.notify_operation_result(CircuitOperationType::Rejected, Duration::from_secs(0), None);
1127    }
1128
1129    fn record_timeout(&self) {
1130        let mut inner = self.inner.write().unwrap();
1131        inner.consecutive_failures += 1;
1132        inner.consecutive_successes = 0;
1133
1134        // Update sliding window
1135        if inner.results_window.len() >= self.config.sliding_window_size {
1136            inner.results_window.pop_front();
1137        }
1138        inner.results_window.push_back(false);
1139
1140        // Update metrics
1141        inner.metrics.total_requests += 1;
1142        inner.metrics.timeout_requests += 1;
1143        inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
1144        inner.metrics.consecutive_successes = 0;
1145        inner.metrics.last_error_timestamp = Some(SystemTime::now());
1146
1147        // Calculate rates
1148        self.update_rates(&mut inner);
1149
1150        drop(inner);
1151
1152        let timeout_error = DecrustError::Timeout {
1153            operation: format!("Operation in circuit breaker '{}'", self.name),
1154            duration: self.config.operation_timeout.unwrap_or_default(),
1155            backtrace: Backtrace::generate(),
1156        };
1157
1158        self.notify_operation_result(
1159            CircuitOperationType::Timeout,
1160            self.config.operation_timeout.unwrap_or_default(),
1161            Some(&timeout_error),
1162        );
1163    }
1164
1165    // Helper methods
1166
1167    fn should_open_circuit(&self) -> bool {
1168        let inner = self.inner.read().unwrap();
1169
1170        // Open if consecutive failures exceed threshold
1171        if inner.consecutive_failures >= self.config.failure_threshold {
1172            return true;
1173        }
1174
1175        // Check failure rate if we have enough samples
1176        if inner.results_window.len() >= self.config.minimum_request_threshold_for_rate {
1177            let failure_count = inner
1178                .results_window
1179                .iter()
1180                .filter(|&&success| !success)
1181                .count();
1182            let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
1183
1184            if failure_rate >= self.config.failure_rate_threshold {
1185                return true;
1186            }
1187        }
1188
1189        // Check slow call rate if configured
1190        if let (Some(threshold), true) = (
1191            self.config.slow_call_rate_threshold,
1192            !inner.slow_call_window.is_empty(),
1193        ) {
1194            let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
1195            let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
1196
1197            if slow_rate >= threshold {
1198                return true;
1199            }
1200        }
1201
1202        false
1203    }
1204
1205    fn should_count_as_failure(&self, error: &DecrustError) -> bool {
1206        // If there's a custom predicate, use that
1207        if let Some(predicate) = &self.config.error_predicate {
1208            return (predicate.as_ref())(error);
1209        }
1210
1211        // By default, all errors count as failures
1212        true
1213    }
1214
1215    fn update_rates(&self, inner: &mut InnerState) {
1216        if inner.results_window.is_empty() {
1217            inner.metrics.failure_rate_in_window = None;
1218        } else {
1219            let failure_count = inner
1220                .results_window
1221                .iter()
1222                .filter(|&&success| !success)
1223                .count();
1224            let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
1225            inner.metrics.failure_rate_in_window = Some(failure_rate);
1226        }
1227
1228        if inner.slow_call_window.is_empty() {
1229            inner.metrics.slow_call_rate_in_window = None;
1230        } else {
1231            let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
1232            let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
1233            inner.metrics.slow_call_rate_in_window = Some(slow_rate);
1234        }
1235    }
1236
1237    // Observer notification methods
1238
1239    fn notify_state_change(&self, event: &CircuitTransitionEvent) {
1240        let observers = self.observers.lock().unwrap();
1241        for observer in &*observers {
1242            observer.on_state_change(&self.name, event);
1243        }
1244    }
1245
1246    fn notify_operation_attempt(&self, state: CircuitBreakerState) {
1247        let observers = self.observers.lock().unwrap();
1248        for observer in &*observers {
1249            observer.on_operation_attempt(&self.name, state);
1250        }
1251    }
1252
1253    fn notify_operation_result(
1254        &self,
1255        op_type: CircuitOperationType,
1256        duration: Duration,
1257        error: Option<&DecrustError>,
1258    ) {
1259        let observers = self.observers.lock().unwrap();
1260        for observer in &*observers {
1261            observer.on_operation_result(&self.name, op_type, duration, error);
1262        }
1263    }
1264    fn notify_reset(&self) {
1265        let observers = self.observers.lock().unwrap();
1266        for observer in &*observers {
1267            observer.on_reset(&self.name);
1268        }
1269    }
1270}