Skip to main content

ccxt_core/
circuit_breaker.rs

1//! Circuit Breaker module for preventing cascading failures.
2//!
3//! This module implements the Circuit Breaker pattern to protect against
4//! continuously failing endpoints. When failures exceed a threshold, the
5//! circuit "opens" and rejects requests immediately, allowing the system
6//! to recover.
7//!
8//! # States
9//!
10//! The circuit breaker has three states:
11//!
12//! - **Closed**: Normal operation, requests are allowed
13//! - **Open**: Blocking requests due to failures, requests are rejected immediately
14//! - **HalfOpen**: Testing if the service has recovered, allows limited requests
15//!
16//! # State Transitions
17//!
18//! ```text
19//! ┌─────────┐  failure_count >= threshold  ┌──────┐
20//! │ Closed  │ ──────────────────────────▶  │ Open │
21//! └─────────┘                              └──────┘
22//!      ▲                                       │
23//!      │ test request succeeds                 │ reset_timeout elapsed
24//!      │                                       ▼
25//!      │                                  ┌──────────┐
26//!      └────────────────────────────────  │ HalfOpen │
27//!                                         └──────────┘
28//!                                              │
29//!                                              │ test request fails
30//!                                              ▼
31//!                                         ┌──────┐
32//!                                         │ Open │
33//!                                         └──────┘
34//! ```
35//!
36//! # Example
37//!
38//! ```rust
39//! use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitState};
40//! use std::time::Duration;
41//!
42//! // Create a circuit breaker with custom configuration
43//! let config = CircuitBreakerConfig {
44//!     failure_threshold: 5,
45//!     reset_timeout: Duration::from_secs(30),
46//!     success_threshold: 1,
47//! };
48//! let breaker = CircuitBreaker::new(config);
49//!
50//! // Check if request is allowed
51//! if breaker.allow_request().is_ok() {
52//!     // Make the request
53//!     // On success:
54//!     breaker.record_success();
55//!     // On failure:
56//!     // breaker.record_failure();
57//! }
58//! ```
59
60use crate::error::{ConfigValidationError, Error, Result, ValidationResult};
61use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
62use std::time::Duration;
63use tokio::sync::mpsc;
64use tracing::{debug, info, warn};
65
66/// Circuit breaker states.
67///
68/// The circuit breaker transitions between these states based on
69/// request success/failure patterns.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[repr(u8)]
72pub enum CircuitState {
73    /// Normal operation, requests are allowed.
74    /// Failures are counted, and if they exceed the threshold,
75    /// the circuit transitions to Open.
76    Closed = 0,
77
78    /// Blocking requests due to failures.
79    /// All requests are rejected immediately with a `ResourceExhausted` error.
80    /// After `reset_timeout` elapses, transitions to HalfOpen.
81    Open = 1,
82
83    /// Testing if the service has recovered.
84    /// Allows a limited number of test requests.
85    /// If successful, transitions to Closed; if failed, transitions back to Open.
86    HalfOpen = 2,
87}
88
89impl CircuitState {
90    /// Converts a u8 value to a CircuitState.
91    fn from_u8(value: u8) -> Self {
92        match value {
93            1 => CircuitState::Open,
94            2 => CircuitState::HalfOpen,
95            _ => CircuitState::Closed, // Default to Closed for invalid values (including 0)
96        }
97    }
98}
99
100/// Circuit breaker configuration.
101///
102/// # Example
103///
104/// ```rust
105/// use ccxt_core::circuit_breaker::CircuitBreakerConfig;
106/// use std::time::Duration;
107///
108/// let config = CircuitBreakerConfig {
109///     failure_threshold: 5,
110///     reset_timeout: Duration::from_secs(30),
111///     success_threshold: 1,
112/// };
113/// ```
114#[derive(Debug, Clone)]
115pub struct CircuitBreakerConfig {
116    /// Number of consecutive failures before opening the circuit.
117    ///
118    /// When the failure count reaches this threshold, the circuit
119    /// transitions from Closed to Open state.
120    ///
121    /// Default: 5
122    pub failure_threshold: u32,
123
124    /// Time to wait in Open state before transitioning to HalfOpen.
125    ///
126    /// After this duration elapses, the circuit breaker will allow
127    /// a test request to check if the service has recovered.
128    ///
129    /// Default: 30 seconds
130    pub reset_timeout: Duration,
131
132    /// Number of consecutive successes needed to close the circuit from HalfOpen.
133    ///
134    /// When in HalfOpen state, this many successful requests are required
135    /// before transitioning back to Closed state.
136    ///
137    /// Default: 1
138    pub success_threshold: u32,
139}
140
141impl Default for CircuitBreakerConfig {
142    fn default() -> Self {
143        Self {
144            failure_threshold: 5,
145            reset_timeout: Duration::from_secs(30),
146            success_threshold: 1,
147        }
148    }
149}
150
151impl CircuitBreakerConfig {
152    /// Creates a new circuit breaker configuration with the given parameters.
153    ///
154    /// # Arguments
155    ///
156    /// * `failure_threshold` - Number of failures before opening the circuit
157    /// * `reset_timeout` - Time to wait before testing recovery
158    /// * `success_threshold` - Number of successes needed to close the circuit
159    pub fn new(failure_threshold: u32, reset_timeout: Duration, success_threshold: u32) -> Self {
160        Self {
161            failure_threshold,
162            reset_timeout,
163            success_threshold,
164        }
165    }
166
167    /// Validates the circuit breaker configuration.
168    ///
169    /// # Returns
170    ///
171    /// Returns `Ok(ValidationResult)` if the configuration is valid.
172    /// Returns `Err(ConfigValidationError)` if the configuration is invalid.
173    ///
174    /// # Validation Rules
175    ///
176    /// - `failure_threshold` must be > 0
177    /// - `success_threshold` must be > 0
178    /// - `reset_timeout` must be >= 1 second
179    ///
180    /// # Example
181    ///
182    /// ```rust
183    /// use ccxt_core::circuit_breaker::CircuitBreakerConfig;
184    ///
185    /// let config = CircuitBreakerConfig::default();
186    /// assert!(config.validate().is_ok());
187    /// ```
188    pub fn validate(&self) -> std::result::Result<ValidationResult, ConfigValidationError> {
189        let mut warnings = Vec::new();
190
191        if self.failure_threshold == 0 {
192            return Err(ConfigValidationError::invalid(
193                "failure_threshold",
194                "failure_threshold must be greater than 0",
195            ));
196        }
197
198        if self.success_threshold == 0 {
199            return Err(ConfigValidationError::invalid(
200                "success_threshold",
201                "success_threshold must be greater than 0",
202            ));
203        }
204
205        if self.reset_timeout < Duration::from_secs(1) {
206            warnings.push(format!(
207                "reset_timeout {:?} is very short, may cause rapid state transitions",
208                self.reset_timeout
209            ));
210        }
211
212        Ok(ValidationResult::with_warnings(warnings))
213    }
214}
215
216/// Circuit breaker events for observability.
217///
218/// These events are emitted when the circuit breaker state changes
219/// or when significant actions occur.
220#[derive(Debug, Clone)]
221pub enum CircuitBreakerEvent {
222    /// The circuit breaker state has changed.
223    StateChanged {
224        /// The previous state
225        from: CircuitState,
226        /// The new state
227        to: CircuitState,
228    },
229
230    /// A request was rejected because the circuit is open.
231    RequestRejected {
232        /// The reason for rejection
233        reason: String,
234    },
235
236    /// A failure was recorded.
237    FailureRecorded {
238        /// The current failure count
239        count: u32,
240    },
241
242    /// A success was recorded.
243    SuccessRecorded {
244        /// The current success count (in HalfOpen state)
245        count: u32,
246    },
247}
248
249/// Circuit breaker for preventing cascading failures.
250///
251/// The circuit breaker monitors request success/failure patterns and
252/// automatically blocks requests to failing endpoints, allowing the
253/// system to recover.
254///
255/// # Thread Safety
256///
257/// This implementation uses atomic operations for all state management,
258/// making it safe to use from multiple threads without external locking.
259///
260/// # Example
261///
262/// ```rust
263/// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
264///
265/// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
266///
267/// // Before making a request
268/// if let Err(e) = breaker.allow_request() {
269///     // Circuit is open, handle the error
270///     println!("Circuit is open: {}", e);
271/// } else {
272///     // Make the request
273///     let success = true; // result of the request
274///     if success {
275///         breaker.record_success();
276///     } else {
277///         breaker.record_failure();
278///     }
279/// }
280/// ```
281#[derive(Debug)]
282pub struct CircuitBreaker {
283    /// Current state (stored as u8 for atomic operations)
284    state: AtomicU8,
285
286    /// Number of consecutive failures in Closed state
287    failure_count: AtomicU32,
288
289    /// Number of consecutive successes in HalfOpen state
290    success_count: AtomicU32,
291
292    /// Timestamp of the last failure (milliseconds since Unix epoch)
293    last_failure_time: AtomicI64,
294
295    /// Configuration
296    config: CircuitBreakerConfig,
297
298    /// Optional event sender for observability
299    event_tx: Option<mpsc::UnboundedSender<CircuitBreakerEvent>>,
300}
301
302impl CircuitBreaker {
303    /// Creates a new circuit breaker with the given configuration.
304    ///
305    /// # Arguments
306    ///
307    /// * `config` - Circuit breaker configuration
308    ///
309    /// # Example
310    ///
311    /// ```rust
312    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
313    ///
314    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
315    /// ```
316    pub fn new(config: CircuitBreakerConfig) -> Self {
317        Self {
318            state: AtomicU8::new(CircuitState::Closed as u8),
319            failure_count: AtomicU32::new(0),
320            success_count: AtomicU32::new(0),
321            last_failure_time: AtomicI64::new(0),
322            config,
323            event_tx: None,
324        }
325    }
326
327    /// Creates a new circuit breaker with an event channel for observability.
328    ///
329    /// # Arguments
330    ///
331    /// * `config` - Circuit breaker configuration
332    /// * `event_tx` - Channel sender for circuit breaker events
333    ///
334    /// # Example
335    ///
336    /// ```rust
337    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerEvent};
338    /// use tokio::sync::mpsc;
339    ///
340    /// let (tx, mut rx) = mpsc::unbounded_channel::<CircuitBreakerEvent>();
341    /// let breaker = CircuitBreaker::with_events(CircuitBreakerConfig::default(), tx);
342    ///
343    /// // Events will be sent to the channel when state changes occur
344    /// ```
345    pub fn with_events(
346        config: CircuitBreakerConfig,
347        event_tx: mpsc::UnboundedSender<CircuitBreakerEvent>,
348    ) -> Self {
349        Self {
350            state: AtomicU8::new(CircuitState::Closed as u8),
351            failure_count: AtomicU32::new(0),
352            success_count: AtomicU32::new(0),
353            last_failure_time: AtomicI64::new(0),
354            config,
355            event_tx: Some(event_tx),
356        }
357    }
358
359    /// Checks if a request should be allowed.
360    ///
361    /// # Returns
362    ///
363    /// Returns `Ok(())` if the request is allowed.
364    /// Returns `Err(Error::ResourceExhausted)` if the circuit is open.
365    ///
366    /// # State Transitions
367    ///
368    /// - **Closed**: Always allows requests
369    /// - **Open**: Checks if `reset_timeout` has elapsed; if so, transitions to HalfOpen
370    /// - **HalfOpen**: Allows requests (for testing recovery)
371    ///
372    /// # Example
373    ///
374    /// ```rust
375    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
376    ///
377    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
378    ///
379    /// match breaker.allow_request() {
380    ///     Ok(()) => println!("Request allowed"),
381    ///     Err(e) => println!("Request blocked: {}", e),
382    /// }
383    /// ```
384    #[allow(clippy::cast_possible_truncation)]
385    pub fn allow_request(&self) -> Result<()> {
386        let state = self.state();
387
388        match state {
389            CircuitState::Closed => {
390                debug!("Circuit breaker: Closed state, allowing request");
391                Ok(())
392            }
393            CircuitState::Open => {
394                // Check if reset timeout has elapsed
395                let last_failure = self.last_failure_time.load(Ordering::Acquire);
396                let now = chrono::Utc::now().timestamp_millis();
397                let elapsed_ms = (now - last_failure).max(0) as u64;
398                let elapsed = Duration::from_millis(elapsed_ms);
399
400                if elapsed >= self.config.reset_timeout {
401                    // Safe truncation: reset_timeout is typically seconds/minutes, not years
402                    let reset_timeout_ms =
403                        self.config
404                            .reset_timeout
405                            .as_millis()
406                            .min(u128::from(u64::MAX)) as u64;
407                    info!(
408                        elapsed_ms = elapsed_ms,
409                        reset_timeout_ms = reset_timeout_ms,
410                        "Circuit breaker: Reset timeout elapsed, transitioning to HalfOpen"
411                    );
412                    self.transition_to(CircuitState::HalfOpen);
413                    Ok(())
414                } else {
415                    // Safe truncation: reset_timeout is typically seconds/minutes, not years
416                    let reset_timeout_ms =
417                        self.config
418                            .reset_timeout
419                            .as_millis()
420                            .min(u128::from(u64::MAX)) as u64;
421                    let remaining_ms = reset_timeout_ms.saturating_sub(elapsed_ms);
422                    warn!(
423                        remaining_ms = remaining_ms,
424                        "Circuit breaker: Open state, rejecting request"
425                    );
426                    self.emit_event(CircuitBreakerEvent::RequestRejected {
427                        reason: format!("Circuit breaker is open, retry after {remaining_ms}ms"),
428                    });
429                    Err(Error::resource_exhausted("Circuit breaker is open"))
430                }
431            }
432            CircuitState::HalfOpen => {
433                debug!("Circuit breaker: HalfOpen state, allowing test request");
434                Ok(())
435            }
436        }
437    }
438
439    /// Records a successful request.
440    ///
441    /// # State Transitions
442    ///
443    /// - **Closed**: Resets the failure count to 0
444    /// - **HalfOpen**: Increments success count; if it reaches `success_threshold`,
445    ///   transitions to Closed
446    /// - **Open**: No effect
447    ///
448    /// # Example
449    ///
450    /// ```rust
451    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
452    ///
453    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
454    ///
455    /// // After a successful request
456    /// breaker.record_success();
457    /// ```
458    pub fn record_success(&self) {
459        let state = self.state();
460
461        match state {
462            CircuitState::Closed => {
463                // Reset failure count on success
464                self.failure_count.store(0, Ordering::Release);
465                debug!("Circuit breaker: Success recorded in Closed state, failure count reset");
466            }
467            CircuitState::HalfOpen => {
468                let count = self.success_count.fetch_add(1, Ordering::AcqRel) + 1;
469                debug!(
470                    success_count = count,
471                    threshold = self.config.success_threshold,
472                    "Circuit breaker: Success recorded in HalfOpen state"
473                );
474
475                self.emit_event(CircuitBreakerEvent::SuccessRecorded { count });
476
477                if count >= self.config.success_threshold {
478                    info!(
479                        success_count = count,
480                        "Circuit breaker: Success threshold reached, transitioning to Closed"
481                    );
482                    self.transition_to(CircuitState::Closed);
483                    self.failure_count.store(0, Ordering::Release);
484                    self.success_count.store(0, Ordering::Release);
485                }
486            }
487            CircuitState::Open => {
488                // No effect in Open state
489                debug!("Circuit breaker: Success recorded in Open state (no effect)");
490            }
491        }
492    }
493
494    /// Records a failed request.
495    ///
496    /// # State Transitions
497    ///
498    /// - **Closed**: Increments failure count; if it reaches `failure_threshold`,
499    ///   transitions to Open
500    /// - **HalfOpen**: Transitions immediately to Open (test request failed)
501    /// - **Open**: No effect
502    ///
503    /// # Example
504    ///
505    /// ```rust
506    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
507    ///
508    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
509    ///
510    /// // After a failed request
511    /// breaker.record_failure();
512    /// ```
513    pub fn record_failure(&self) {
514        // ATOMIC STATE TRANSITION with failure counting
515        // We use a CAS loop that combines state check and transition to avoid race conditions
516        // where multiple threads could update last_failure_time inconsistently.
517        let mut current_state = self.state.load(Ordering::Acquire);
518
519        loop {
520            if current_state == CircuitState::Open as u8 {
521                // Already Open - just increment failure count for monitoring, no state transition needed
522                let prev_failures = self.failure_count.fetch_add(1, Ordering::AcqRel);
523                let current_failures = prev_failures + 1;
524
525                debug!(
526                    failure_count = current_failures,
527                    threshold = self.config.failure_threshold,
528                    "Circuit breaker: Failure recorded (already Open)"
529                );
530
531                self.emit_event(CircuitBreakerEvent::FailureRecorded {
532                    count: current_failures,
533                });
534                return;
535            }
536
537            // For Closed/HalfOpen states: atomically increment and check threshold
538            let prev_failures = self.failure_count.fetch_add(1, Ordering::AcqRel);
539            let current_failures = prev_failures + 1;
540
541            debug!(
542                failure_count = current_failures,
543                threshold = self.config.failure_threshold,
544                "Circuit breaker: Failure recorded"
545            );
546
547            self.emit_event(CircuitBreakerEvent::FailureRecorded {
548                count: current_failures,
549            });
550
551            // Check if we should transition to Open
552            if current_failures >= self.config.failure_threshold {
553                // CAS: Atomically check and transition state
554                // Only the thread that successfully transitions will update last_failure_time
555                match self.state.compare_exchange_weak(
556                    current_state,
557                    CircuitState::Open as u8,
558                    Ordering::AcqRel,
559                    Ordering::Acquire,
560                ) {
561                    Ok(_) => {
562                        // Successfully transitioned to Open state - only this thread updates timestamp
563                        warn!(
564                            failure_count = current_failures,
565                            "Circuit breaker: Transitioned to Open state (atomic)"
566                        );
567                        self.last_failure_time
568                            .store(chrono::Utc::now().timestamp_millis(), Ordering::Release);
569                        return;
570                    }
571                    Err(actual) => {
572                        // CAS failed: another thread modified state
573                        // If it's now Open, we're done (another thread handled the transition)
574                        if actual == CircuitState::Open as u8 {
575                            return;
576                        }
577                        // Otherwise retry with the new state
578                        current_state = actual;
579                        // Note: failure_count was already incremented, which is correct
580                        // We just need to retry the state transition
581                        continue;
582                    }
583                }
584            }
585
586            // Threshold not reached, we're done
587            return;
588        }
589    }
590
591    /// Returns the current state of the circuit breaker.
592    ///
593    /// # Example
594    ///
595    /// ```rust
596    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitState};
597    ///
598    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
599    /// assert_eq!(breaker.state(), CircuitState::Closed);
600    /// ```
601    pub fn state(&self) -> CircuitState {
602        CircuitState::from_u8(self.state.load(Ordering::Acquire))
603    }
604
605    /// Returns the current failure count.
606    ///
607    /// This is primarily useful for monitoring and debugging.
608    pub fn failure_count(&self) -> u32 {
609        self.failure_count.load(Ordering::Acquire)
610    }
611
612    /// Returns the current success count (in HalfOpen state).
613    ///
614    /// This is primarily useful for monitoring and debugging.
615    pub fn success_count(&self) -> u32 {
616        self.success_count.load(Ordering::Acquire)
617    }
618
619    /// Returns a reference to the configuration.
620    pub fn config(&self) -> &CircuitBreakerConfig {
621        &self.config
622    }
623
624    /// Resets the circuit breaker to its initial state (Closed).
625    ///
626    /// This can be useful for testing or manual intervention.
627    ///
628    /// # Example
629    ///
630    /// ```rust
631    /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
632    ///
633    /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
634    /// // ... some operations that opened the circuit ...
635    /// breaker.reset();
636    /// // Circuit is now Closed again
637    /// ```
638    pub fn reset(&self) {
639        let old_state = self.state();
640        self.state
641            .store(CircuitState::Closed as u8, Ordering::Release);
642        self.failure_count.store(0, Ordering::Release);
643        self.success_count.store(0, Ordering::Release);
644        self.last_failure_time.store(0, Ordering::Release);
645
646        if old_state != CircuitState::Closed {
647            info!(
648                from = ?old_state,
649                "Circuit breaker: Manual reset to Closed state"
650            );
651            self.emit_event(CircuitBreakerEvent::StateChanged {
652                from: old_state,
653                to: CircuitState::Closed,
654            });
655        }
656    }
657
658    /// Transitions to a new state and emits an event.
659    fn transition_to(&self, new_state: CircuitState) {
660        let old_state = self.state();
661        self.state.store(new_state as u8, Ordering::Release);
662
663        self.emit_event(CircuitBreakerEvent::StateChanged {
664            from: old_state,
665            to: new_state,
666        });
667    }
668
669    /// Emits an event if an event channel is configured.
670    fn emit_event(&self, event: CircuitBreakerEvent) {
671        if let Some(ref tx) = self.event_tx {
672            // Ignore send errors (receiver may have been dropped)
673            let _ = tx.send(event);
674        }
675    }
676}
677
678#[cfg(test)]
679#[allow(clippy::disallowed_methods)] // unwrap() is acceptable in tests
680mod tests {
681    use super::*;
682
683    #[test]
684    fn test_circuit_state_from_u8() {
685        assert_eq!(CircuitState::from_u8(0), CircuitState::Closed);
686        assert_eq!(CircuitState::from_u8(1), CircuitState::Open);
687        assert_eq!(CircuitState::from_u8(2), CircuitState::HalfOpen);
688        assert_eq!(CircuitState::from_u8(255), CircuitState::Closed); // Invalid defaults to Closed
689    }
690
691    #[test]
692    fn test_circuit_breaker_config_default() {
693        let config = CircuitBreakerConfig::default();
694        assert_eq!(config.failure_threshold, 5);
695        assert_eq!(config.reset_timeout, Duration::from_secs(30));
696        assert_eq!(config.success_threshold, 1);
697    }
698
699    #[test]
700    fn test_circuit_breaker_config_new() {
701        let config = CircuitBreakerConfig::new(10, Duration::from_secs(60), 2);
702        assert_eq!(config.failure_threshold, 10);
703        assert_eq!(config.reset_timeout, Duration::from_secs(60));
704        assert_eq!(config.success_threshold, 2);
705    }
706
707    #[test]
708    fn test_circuit_breaker_config_validate_default() {
709        let config = CircuitBreakerConfig::default();
710        let result = config.validate();
711        assert!(result.is_ok());
712        assert!(result.unwrap().warnings.is_empty());
713    }
714
715    #[test]
716    fn test_circuit_breaker_config_validate_zero_failure_threshold() {
717        let config = CircuitBreakerConfig {
718            failure_threshold: 0,
719            ..Default::default()
720        };
721        let result = config.validate();
722        assert!(result.is_err());
723        let err = result.unwrap_err();
724        assert_eq!(err.field_name(), "failure_threshold");
725    }
726
727    #[test]
728    fn test_circuit_breaker_config_validate_zero_success_threshold() {
729        let config = CircuitBreakerConfig {
730            success_threshold: 0,
731            ..Default::default()
732        };
733        let result = config.validate();
734        assert!(result.is_err());
735        let err = result.unwrap_err();
736        assert_eq!(err.field_name(), "success_threshold");
737    }
738
739    #[test]
740    fn test_circuit_breaker_config_validate_short_reset_timeout_warning() {
741        let config = CircuitBreakerConfig {
742            reset_timeout: Duration::from_millis(500),
743            ..Default::default()
744        };
745        let result = config.validate();
746        assert!(result.is_ok());
747        let validation_result = result.unwrap();
748        assert!(!validation_result.warnings.is_empty());
749        assert!(validation_result.warnings[0].contains("reset_timeout"));
750    }
751
752    #[test]
753    fn test_circuit_breaker_initial_state() {
754        let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
755        assert_eq!(breaker.state(), CircuitState::Closed);
756        assert_eq!(breaker.failure_count(), 0);
757        assert_eq!(breaker.success_count(), 0);
758    }
759
760    #[test]
761    fn test_circuit_breaker_allow_request_closed() {
762        let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
763        assert!(breaker.allow_request().is_ok());
764    }
765
766    #[test]
767    fn test_circuit_breaker_record_success_closed() {
768        let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
769
770        // Record some failures first
771        breaker.record_failure();
772        breaker.record_failure();
773        assert_eq!(breaker.failure_count(), 2);
774
775        // Success should reset failure count
776        breaker.record_success();
777        assert_eq!(breaker.failure_count(), 0);
778        assert_eq!(breaker.state(), CircuitState::Closed);
779    }
780
781    #[test]
782    fn test_circuit_breaker_transition_to_open() {
783        let config = CircuitBreakerConfig {
784            failure_threshold: 3,
785            ..Default::default()
786        };
787        let breaker = CircuitBreaker::new(config);
788
789        // Record failures up to threshold
790        breaker.record_failure();
791        assert_eq!(breaker.state(), CircuitState::Closed);
792        breaker.record_failure();
793        assert_eq!(breaker.state(), CircuitState::Closed);
794        breaker.record_failure();
795
796        // Should transition to Open after reaching threshold
797        assert_eq!(breaker.state(), CircuitState::Open);
798    }
799
800    #[test]
801    fn test_circuit_breaker_reject_when_open() {
802        let config = CircuitBreakerConfig {
803            failure_threshold: 1,
804            reset_timeout: Duration::from_secs(60), // Long timeout
805            ..Default::default()
806        };
807        let breaker = CircuitBreaker::new(config);
808
809        // Open the circuit
810        breaker.record_failure();
811        assert_eq!(breaker.state(), CircuitState::Open);
812
813        // Request should be rejected
814        let result = breaker.allow_request();
815        assert!(result.is_err());
816
817        if let Err(e) = result {
818            assert!(e.as_resource_exhausted().is_some());
819        }
820    }
821
822    #[test]
823    fn test_circuit_breaker_transition_to_half_open() {
824        let config = CircuitBreakerConfig {
825            failure_threshold: 1,
826            reset_timeout: Duration::from_millis(10), // Very short timeout for testing
827            ..Default::default()
828        };
829        let breaker = CircuitBreaker::new(config);
830
831        // Open the circuit
832        breaker.record_failure();
833        assert_eq!(breaker.state(), CircuitState::Open);
834
835        // Wait for reset timeout
836        std::thread::sleep(Duration::from_millis(20));
837
838        // Request should be allowed and state should transition to HalfOpen
839        assert!(breaker.allow_request().is_ok());
840        assert_eq!(breaker.state(), CircuitState::HalfOpen);
841    }
842
843    #[test]
844    fn test_circuit_breaker_half_open_success_closes() {
845        let config = CircuitBreakerConfig {
846            failure_threshold: 1,
847            reset_timeout: Duration::from_millis(10),
848            success_threshold: 1,
849        };
850        let breaker = CircuitBreaker::new(config);
851
852        // Open the circuit
853        breaker.record_failure();
854        assert_eq!(breaker.state(), CircuitState::Open);
855
856        // Wait for reset timeout
857        std::thread::sleep(Duration::from_millis(20));
858
859        // Transition to HalfOpen
860        assert!(breaker.allow_request().is_ok());
861        assert_eq!(breaker.state(), CircuitState::HalfOpen);
862
863        // Success should close the circuit
864        breaker.record_success();
865        assert_eq!(breaker.state(), CircuitState::Closed);
866        assert_eq!(breaker.failure_count(), 0);
867        assert_eq!(breaker.success_count(), 0);
868    }
869
870    #[test]
871    fn test_circuit_breaker_half_open_failure_opens() {
872        let config = CircuitBreakerConfig {
873            failure_threshold: 1,
874            reset_timeout: Duration::from_millis(10),
875            success_threshold: 1,
876        };
877        let breaker = CircuitBreaker::new(config);
878
879        // Open the circuit
880        breaker.record_failure();
881        assert_eq!(breaker.state(), CircuitState::Open);
882
883        // Wait for reset timeout
884        std::thread::sleep(Duration::from_millis(20));
885
886        // Transition to HalfOpen
887        assert!(breaker.allow_request().is_ok());
888        assert_eq!(breaker.state(), CircuitState::HalfOpen);
889
890        // Failure should reopen the circuit
891        breaker.record_failure();
892        assert_eq!(breaker.state(), CircuitState::Open);
893    }
894
895    #[test]
896    fn test_circuit_breaker_multiple_successes_required() {
897        let config = CircuitBreakerConfig {
898            failure_threshold: 1,
899            reset_timeout: Duration::from_millis(10),
900            success_threshold: 3, // Require 3 successes
901        };
902        let breaker = CircuitBreaker::new(config);
903
904        // Open the circuit
905        breaker.record_failure();
906        std::thread::sleep(Duration::from_millis(20));
907        assert!(breaker.allow_request().is_ok());
908        assert_eq!(breaker.state(), CircuitState::HalfOpen);
909
910        // First success - still HalfOpen
911        breaker.record_success();
912        assert_eq!(breaker.state(), CircuitState::HalfOpen);
913        assert_eq!(breaker.success_count(), 1);
914
915        // Second success - still HalfOpen
916        breaker.record_success();
917        assert_eq!(breaker.state(), CircuitState::HalfOpen);
918        assert_eq!(breaker.success_count(), 2);
919
920        // Third success - should close
921        breaker.record_success();
922        assert_eq!(breaker.state(), CircuitState::Closed);
923    }
924
925    #[test]
926    fn test_circuit_breaker_reset() {
927        let config = CircuitBreakerConfig {
928            failure_threshold: 1,
929            ..Default::default()
930        };
931        let breaker = CircuitBreaker::new(config);
932
933        // Open the circuit
934        breaker.record_failure();
935        assert_eq!(breaker.state(), CircuitState::Open);
936
937        // Reset should close the circuit
938        breaker.reset();
939        assert_eq!(breaker.state(), CircuitState::Closed);
940        assert_eq!(breaker.failure_count(), 0);
941        assert_eq!(breaker.success_count(), 0);
942    }
943
944    #[tokio::test]
945    async fn test_circuit_breaker_with_events() {
946        let (tx, mut rx) = mpsc::unbounded_channel::<CircuitBreakerEvent>();
947        let config = CircuitBreakerConfig {
948            failure_threshold: 2,
949            reset_timeout: Duration::from_millis(10),
950            success_threshold: 1,
951        };
952        let breaker = CircuitBreaker::with_events(config, tx);
953
954        // Record failures
955        breaker.record_failure();
956        breaker.record_failure(); // This should trigger state change
957
958        // Check events
959        let event1 = rx.recv().await.unwrap();
960        assert!(matches!(
961            event1,
962            CircuitBreakerEvent::FailureRecorded { count: 1 }
963        ));
964
965        let event2 = rx.recv().await.unwrap();
966        assert!(matches!(
967            event2,
968            CircuitBreakerEvent::FailureRecorded { count: 2 }
969        ));
970
971        let event3 = rx.recv().await.unwrap();
972        assert!(matches!(
973            event3,
974            CircuitBreakerEvent::StateChanged {
975                from: CircuitState::Closed,
976                to: CircuitState::Open
977            }
978        ));
979    }
980
981    #[test]
982    fn test_circuit_breaker_thread_safety() {
983        use std::sync::Arc;
984        use std::thread;
985
986        let breaker = Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
987            failure_threshold: 100,
988            ..Default::default()
989        }));
990
991        let mut handles = vec![];
992
993        // Spawn multiple threads recording failures
994        for _ in 0..10 {
995            let breaker_clone = Arc::clone(&breaker);
996            handles.push(thread::spawn(move || {
997                for _ in 0..10 {
998                    breaker_clone.record_failure();
999                }
1000            }));
1001        }
1002
1003        // Wait for all threads to complete
1004        for handle in handles {
1005            handle.join().unwrap();
1006        }
1007
1008        // Should have recorded 100 failures and transitioned to Open
1009        assert_eq!(breaker.state(), CircuitState::Open);
1010    }
1011}