ccxt_core/circuit_breaker.rs
1//! Circuit Breaker module for preventing cascading failures.
2//!
3//! This module implements the Circuit Breaker pattern to protect against
4//! continuously failing endpoints. When failures exceed a threshold, the
5//! circuit "opens" and rejects requests immediately, allowing the system
6//! to recover.
7//!
8//! # States
9//!
10//! The circuit breaker has three states:
11//!
12//! - **Closed**: Normal operation, requests are allowed
13//! - **Open**: Blocking requests due to failures, requests are rejected immediately
14//! - **HalfOpen**: Testing if the service has recovered, allows limited requests
15//!
16//! # State Transitions
17//!
18//! ```text
19//! ┌─────────┐ failure_count >= threshold ┌──────┐
20//! │ Closed │ ──────────────────────────▶ │ Open │
21//! └─────────┘ └──────┘
22//! ▲ │
23//! │ test request succeeds │ reset_timeout elapsed
24//! │ ▼
25//! │ ┌──────────┐
26//! └──────────────────────────────── │ HalfOpen │
27//! └──────────┘
28//! │
29//! │ test request fails
30//! ▼
31//! ┌──────┐
32//! │ Open │
33//! └──────┘
34//! ```
35//!
36//! # Example
37//!
38//! ```rust
39//! use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitState};
40//! use std::time::Duration;
41//!
42//! // Create a circuit breaker with custom configuration
43//! let config = CircuitBreakerConfig {
44//! failure_threshold: 5,
45//! reset_timeout: Duration::from_secs(30),
46//! success_threshold: 1,
47//! };
48//! let breaker = CircuitBreaker::new(config);
49//!
50//! // Check if request is allowed
51//! if breaker.allow_request().is_ok() {
52//! // Make the request
53//! // On success:
54//! breaker.record_success();
55//! // On failure:
56//! // breaker.record_failure();
57//! }
58//! ```
59
60use crate::error::{ConfigValidationError, Error, Result, ValidationResult};
61use std::sync::atomic::{AtomicI64, AtomicU8, AtomicU32, Ordering};
62use std::time::Duration;
63use tokio::sync::mpsc;
64use tracing::{debug, info, warn};
65
66/// Circuit breaker states.
67///
68/// The circuit breaker transitions between these states based on
69/// request success/failure patterns.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[repr(u8)]
72pub enum CircuitState {
73 /// Normal operation, requests are allowed.
74 /// Failures are counted, and if they exceed the threshold,
75 /// the circuit transitions to Open.
76 Closed = 0,
77
78 /// Blocking requests due to failures.
79 /// All requests are rejected immediately with a `ResourceExhausted` error.
80 /// After `reset_timeout` elapses, transitions to HalfOpen.
81 Open = 1,
82
83 /// Testing if the service has recovered.
84 /// Allows a limited number of test requests.
85 /// If successful, transitions to Closed; if failed, transitions back to Open.
86 HalfOpen = 2,
87}
88
89impl CircuitState {
90 /// Converts a u8 value to a CircuitState.
91 fn from_u8(value: u8) -> Self {
92 match value {
93 1 => CircuitState::Open,
94 2 => CircuitState::HalfOpen,
95 _ => CircuitState::Closed, // Default to Closed for invalid values (including 0)
96 }
97 }
98}
99
100/// Circuit breaker configuration.
101///
102/// # Example
103///
104/// ```rust
105/// use ccxt_core::circuit_breaker::CircuitBreakerConfig;
106/// use std::time::Duration;
107///
108/// let config = CircuitBreakerConfig {
109/// failure_threshold: 5,
110/// reset_timeout: Duration::from_secs(30),
111/// success_threshold: 1,
112/// };
113/// ```
114#[derive(Debug, Clone)]
115pub struct CircuitBreakerConfig {
116 /// Number of consecutive failures before opening the circuit.
117 ///
118 /// When the failure count reaches this threshold, the circuit
119 /// transitions from Closed to Open state.
120 ///
121 /// Default: 5
122 pub failure_threshold: u32,
123
124 /// Time to wait in Open state before transitioning to HalfOpen.
125 ///
126 /// After this duration elapses, the circuit breaker will allow
127 /// a test request to check if the service has recovered.
128 ///
129 /// Default: 30 seconds
130 pub reset_timeout: Duration,
131
132 /// Number of consecutive successes needed to close the circuit from HalfOpen.
133 ///
134 /// When in HalfOpen state, this many successful requests are required
135 /// before transitioning back to Closed state.
136 ///
137 /// Default: 1
138 pub success_threshold: u32,
139}
140
141impl Default for CircuitBreakerConfig {
142 fn default() -> Self {
143 Self {
144 failure_threshold: 5,
145 reset_timeout: Duration::from_secs(30),
146 success_threshold: 1,
147 }
148 }
149}
150
151impl CircuitBreakerConfig {
152 /// Creates a new circuit breaker configuration with the given parameters.
153 ///
154 /// # Arguments
155 ///
156 /// * `failure_threshold` - Number of failures before opening the circuit
157 /// * `reset_timeout` - Time to wait before testing recovery
158 /// * `success_threshold` - Number of successes needed to close the circuit
159 pub fn new(failure_threshold: u32, reset_timeout: Duration, success_threshold: u32) -> Self {
160 Self {
161 failure_threshold,
162 reset_timeout,
163 success_threshold,
164 }
165 }
166
167 /// Validates the circuit breaker configuration.
168 ///
169 /// # Returns
170 ///
171 /// Returns `Ok(ValidationResult)` if the configuration is valid.
172 /// Returns `Err(ConfigValidationError)` if the configuration is invalid.
173 ///
174 /// # Validation Rules
175 ///
176 /// - `failure_threshold` must be > 0
177 /// - `success_threshold` must be > 0
178 /// - `reset_timeout` must be >= 1 second
179 ///
180 /// # Example
181 ///
182 /// ```rust
183 /// use ccxt_core::circuit_breaker::CircuitBreakerConfig;
184 ///
185 /// let config = CircuitBreakerConfig::default();
186 /// assert!(config.validate().is_ok());
187 /// ```
188 pub fn validate(&self) -> std::result::Result<ValidationResult, ConfigValidationError> {
189 let mut warnings = Vec::new();
190
191 if self.failure_threshold == 0 {
192 return Err(ConfigValidationError::invalid(
193 "failure_threshold",
194 "failure_threshold must be greater than 0",
195 ));
196 }
197
198 if self.success_threshold == 0 {
199 return Err(ConfigValidationError::invalid(
200 "success_threshold",
201 "success_threshold must be greater than 0",
202 ));
203 }
204
205 if self.reset_timeout < Duration::from_secs(1) {
206 warnings.push(format!(
207 "reset_timeout {:?} is very short, may cause rapid state transitions",
208 self.reset_timeout
209 ));
210 }
211
212 Ok(ValidationResult::with_warnings(warnings))
213 }
214}
215
216/// Circuit breaker events for observability.
217///
218/// These events are emitted when the circuit breaker state changes
219/// or when significant actions occur.
220#[derive(Debug, Clone)]
221pub enum CircuitBreakerEvent {
222 /// The circuit breaker state has changed.
223 StateChanged {
224 /// The previous state
225 from: CircuitState,
226 /// The new state
227 to: CircuitState,
228 },
229
230 /// A request was rejected because the circuit is open.
231 RequestRejected {
232 /// The reason for rejection
233 reason: String,
234 },
235
236 /// A failure was recorded.
237 FailureRecorded {
238 /// The current failure count
239 count: u32,
240 },
241
242 /// A success was recorded.
243 SuccessRecorded {
244 /// The current success count (in HalfOpen state)
245 count: u32,
246 },
247}
248
249/// Circuit breaker for preventing cascading failures.
250///
251/// The circuit breaker monitors request success/failure patterns and
252/// automatically blocks requests to failing endpoints, allowing the
253/// system to recover.
254///
255/// # Thread Safety
256///
257/// This implementation uses atomic operations for all state management,
258/// making it safe to use from multiple threads without external locking.
259///
260/// # Example
261///
262/// ```rust
263/// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
264///
265/// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
266///
267/// // Before making a request
268/// if let Err(e) = breaker.allow_request() {
269/// // Circuit is open, handle the error
270/// println!("Circuit is open: {}", e);
271/// } else {
272/// // Make the request
273/// let success = true; // result of the request
274/// if success {
275/// breaker.record_success();
276/// } else {
277/// breaker.record_failure();
278/// }
279/// }
280/// ```
281#[derive(Debug)]
282pub struct CircuitBreaker {
283 /// Current state (stored as u8 for atomic operations)
284 state: AtomicU8,
285
286 /// Number of consecutive failures in Closed state
287 failure_count: AtomicU32,
288
289 /// Number of consecutive successes in HalfOpen state
290 success_count: AtomicU32,
291
292 /// Timestamp of the last failure (milliseconds since Unix epoch)
293 last_failure_time: AtomicI64,
294
295 /// Configuration
296 config: CircuitBreakerConfig,
297
298 /// Optional event sender for observability
299 event_tx: Option<mpsc::UnboundedSender<CircuitBreakerEvent>>,
300}
301
302impl CircuitBreaker {
303 /// Creates a new circuit breaker with the given configuration.
304 ///
305 /// # Arguments
306 ///
307 /// * `config` - Circuit breaker configuration
308 ///
309 /// # Example
310 ///
311 /// ```rust
312 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
313 ///
314 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
315 /// ```
316 pub fn new(config: CircuitBreakerConfig) -> Self {
317 Self {
318 state: AtomicU8::new(CircuitState::Closed as u8),
319 failure_count: AtomicU32::new(0),
320 success_count: AtomicU32::new(0),
321 last_failure_time: AtomicI64::new(0),
322 config,
323 event_tx: None,
324 }
325 }
326
327 /// Creates a new circuit breaker with an event channel for observability.
328 ///
329 /// # Arguments
330 ///
331 /// * `config` - Circuit breaker configuration
332 /// * `event_tx` - Channel sender for circuit breaker events
333 ///
334 /// # Example
335 ///
336 /// ```rust
337 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitBreakerEvent};
338 /// use tokio::sync::mpsc;
339 ///
340 /// let (tx, mut rx) = mpsc::unbounded_channel::<CircuitBreakerEvent>();
341 /// let breaker = CircuitBreaker::with_events(CircuitBreakerConfig::default(), tx);
342 ///
343 /// // Events will be sent to the channel when state changes occur
344 /// ```
345 pub fn with_events(
346 config: CircuitBreakerConfig,
347 event_tx: mpsc::UnboundedSender<CircuitBreakerEvent>,
348 ) -> Self {
349 Self {
350 state: AtomicU8::new(CircuitState::Closed as u8),
351 failure_count: AtomicU32::new(0),
352 success_count: AtomicU32::new(0),
353 last_failure_time: AtomicI64::new(0),
354 config,
355 event_tx: Some(event_tx),
356 }
357 }
358
359 /// Checks if a request should be allowed.
360 ///
361 /// # Returns
362 ///
363 /// Returns `Ok(())` if the request is allowed.
364 /// Returns `Err(Error::ResourceExhausted)` if the circuit is open.
365 ///
366 /// # State Transitions
367 ///
368 /// - **Closed**: Always allows requests
369 /// - **Open**: Checks if `reset_timeout` has elapsed; if so, transitions to HalfOpen
370 /// - **HalfOpen**: Allows requests (for testing recovery)
371 ///
372 /// # Example
373 ///
374 /// ```rust
375 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
376 ///
377 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
378 ///
379 /// match breaker.allow_request() {
380 /// Ok(()) => println!("Request allowed"),
381 /// Err(e) => println!("Request blocked: {}", e),
382 /// }
383 /// ```
384 #[allow(clippy::cast_possible_truncation)]
385 pub fn allow_request(&self) -> Result<()> {
386 let state = self.state();
387
388 match state {
389 CircuitState::Closed => {
390 debug!("Circuit breaker: Closed state, allowing request");
391 Ok(())
392 }
393 CircuitState::Open => {
394 // Check if reset timeout has elapsed
395 let last_failure = self.last_failure_time.load(Ordering::Acquire);
396 let now = chrono::Utc::now().timestamp_millis();
397 let elapsed_ms = (now - last_failure).max(0) as u64;
398 let elapsed = Duration::from_millis(elapsed_ms);
399
400 if elapsed >= self.config.reset_timeout {
401 // Safe truncation: reset_timeout is typically seconds/minutes, not years
402 let reset_timeout_ms =
403 self.config
404 .reset_timeout
405 .as_millis()
406 .min(u128::from(u64::MAX)) as u64;
407 info!(
408 elapsed_ms = elapsed_ms,
409 reset_timeout_ms = reset_timeout_ms,
410 "Circuit breaker: Reset timeout elapsed, transitioning to HalfOpen"
411 );
412 self.transition_to(CircuitState::HalfOpen);
413 Ok(())
414 } else {
415 // Safe truncation: reset_timeout is typically seconds/minutes, not years
416 let reset_timeout_ms =
417 self.config
418 .reset_timeout
419 .as_millis()
420 .min(u128::from(u64::MAX)) as u64;
421 let remaining_ms = reset_timeout_ms.saturating_sub(elapsed_ms);
422 warn!(
423 remaining_ms = remaining_ms,
424 "Circuit breaker: Open state, rejecting request"
425 );
426 self.emit_event(CircuitBreakerEvent::RequestRejected {
427 reason: format!("Circuit breaker is open, retry after {remaining_ms}ms"),
428 });
429 Err(Error::resource_exhausted("Circuit breaker is open"))
430 }
431 }
432 CircuitState::HalfOpen => {
433 debug!("Circuit breaker: HalfOpen state, allowing test request");
434 Ok(())
435 }
436 }
437 }
438
439 /// Records a successful request.
440 ///
441 /// # State Transitions
442 ///
443 /// - **Closed**: Resets the failure count to 0
444 /// - **HalfOpen**: Increments success count; if it reaches `success_threshold`,
445 /// transitions to Closed
446 /// - **Open**: No effect
447 ///
448 /// # Example
449 ///
450 /// ```rust
451 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
452 ///
453 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
454 ///
455 /// // After a successful request
456 /// breaker.record_success();
457 /// ```
458 pub fn record_success(&self) {
459 let state = self.state();
460
461 match state {
462 CircuitState::Closed => {
463 // Reset failure count on success
464 self.failure_count.store(0, Ordering::Release);
465 debug!("Circuit breaker: Success recorded in Closed state, failure count reset");
466 }
467 CircuitState::HalfOpen => {
468 let count = self.success_count.fetch_add(1, Ordering::AcqRel) + 1;
469 debug!(
470 success_count = count,
471 threshold = self.config.success_threshold,
472 "Circuit breaker: Success recorded in HalfOpen state"
473 );
474
475 self.emit_event(CircuitBreakerEvent::SuccessRecorded { count });
476
477 if count >= self.config.success_threshold {
478 info!(
479 success_count = count,
480 "Circuit breaker: Success threshold reached, transitioning to Closed"
481 );
482 self.transition_to(CircuitState::Closed);
483 self.failure_count.store(0, Ordering::Release);
484 self.success_count.store(0, Ordering::Release);
485 }
486 }
487 CircuitState::Open => {
488 // No effect in Open state
489 debug!("Circuit breaker: Success recorded in Open state (no effect)");
490 }
491 }
492 }
493
494 /// Records a failed request.
495 ///
496 /// # State Transitions
497 ///
498 /// - **Closed**: Increments failure count; if it reaches `failure_threshold`,
499 /// transitions to Open
500 /// - **HalfOpen**: Transitions immediately to Open (test request failed)
501 /// - **Open**: No effect
502 ///
503 /// # Example
504 ///
505 /// ```rust
506 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
507 ///
508 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
509 ///
510 /// // After a failed request
511 /// breaker.record_failure();
512 /// ```
513 pub fn record_failure(&self) {
514 // ATOMIC STATE TRANSITION with failure counting
515 // We use a CAS loop that combines state check and transition to avoid race conditions
516 // where multiple threads could update last_failure_time inconsistently.
517 let mut current_state = self.state.load(Ordering::Acquire);
518
519 loop {
520 if current_state == CircuitState::Open as u8 {
521 // Already Open - just increment failure count for monitoring, no state transition needed
522 let prev_failures = self.failure_count.fetch_add(1, Ordering::AcqRel);
523 let current_failures = prev_failures + 1;
524
525 debug!(
526 failure_count = current_failures,
527 threshold = self.config.failure_threshold,
528 "Circuit breaker: Failure recorded (already Open)"
529 );
530
531 self.emit_event(CircuitBreakerEvent::FailureRecorded {
532 count: current_failures,
533 });
534 return;
535 }
536
537 // For Closed/HalfOpen states: atomically increment and check threshold
538 let prev_failures = self.failure_count.fetch_add(1, Ordering::AcqRel);
539 let current_failures = prev_failures + 1;
540
541 debug!(
542 failure_count = current_failures,
543 threshold = self.config.failure_threshold,
544 "Circuit breaker: Failure recorded"
545 );
546
547 self.emit_event(CircuitBreakerEvent::FailureRecorded {
548 count: current_failures,
549 });
550
551 // Check if we should transition to Open
552 if current_failures >= self.config.failure_threshold {
553 // CAS: Atomically check and transition state
554 // Only the thread that successfully transitions will update last_failure_time
555 match self.state.compare_exchange_weak(
556 current_state,
557 CircuitState::Open as u8,
558 Ordering::AcqRel,
559 Ordering::Acquire,
560 ) {
561 Ok(_) => {
562 // Successfully transitioned to Open state - only this thread updates timestamp
563 warn!(
564 failure_count = current_failures,
565 "Circuit breaker: Transitioned to Open state (atomic)"
566 );
567 self.last_failure_time
568 .store(chrono::Utc::now().timestamp_millis(), Ordering::Release);
569 return;
570 }
571 Err(actual) => {
572 // CAS failed: another thread modified state
573 // If it's now Open, we're done (another thread handled the transition)
574 if actual == CircuitState::Open as u8 {
575 return;
576 }
577 // Otherwise retry with the new state
578 current_state = actual;
579 // Note: failure_count was already incremented, which is correct
580 // We just need to retry the state transition
581 continue;
582 }
583 }
584 }
585
586 // Threshold not reached, we're done
587 return;
588 }
589 }
590
591 /// Returns the current state of the circuit breaker.
592 ///
593 /// # Example
594 ///
595 /// ```rust
596 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig, CircuitState};
597 ///
598 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
599 /// assert_eq!(breaker.state(), CircuitState::Closed);
600 /// ```
601 pub fn state(&self) -> CircuitState {
602 CircuitState::from_u8(self.state.load(Ordering::Acquire))
603 }
604
605 /// Returns the current failure count.
606 ///
607 /// This is primarily useful for monitoring and debugging.
608 pub fn failure_count(&self) -> u32 {
609 self.failure_count.load(Ordering::Acquire)
610 }
611
612 /// Returns the current success count (in HalfOpen state).
613 ///
614 /// This is primarily useful for monitoring and debugging.
615 pub fn success_count(&self) -> u32 {
616 self.success_count.load(Ordering::Acquire)
617 }
618
619 /// Returns a reference to the configuration.
620 pub fn config(&self) -> &CircuitBreakerConfig {
621 &self.config
622 }
623
624 /// Resets the circuit breaker to its initial state (Closed).
625 ///
626 /// This can be useful for testing or manual intervention.
627 ///
628 /// # Example
629 ///
630 /// ```rust
631 /// use ccxt_core::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
632 ///
633 /// let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
634 /// // ... some operations that opened the circuit ...
635 /// breaker.reset();
636 /// // Circuit is now Closed again
637 /// ```
638 pub fn reset(&self) {
639 let old_state = self.state();
640 self.state
641 .store(CircuitState::Closed as u8, Ordering::Release);
642 self.failure_count.store(0, Ordering::Release);
643 self.success_count.store(0, Ordering::Release);
644 self.last_failure_time.store(0, Ordering::Release);
645
646 if old_state != CircuitState::Closed {
647 info!(
648 from = ?old_state,
649 "Circuit breaker: Manual reset to Closed state"
650 );
651 self.emit_event(CircuitBreakerEvent::StateChanged {
652 from: old_state,
653 to: CircuitState::Closed,
654 });
655 }
656 }
657
658 /// Transitions to a new state and emits an event.
659 fn transition_to(&self, new_state: CircuitState) {
660 let old_state = self.state();
661 self.state.store(new_state as u8, Ordering::Release);
662
663 self.emit_event(CircuitBreakerEvent::StateChanged {
664 from: old_state,
665 to: new_state,
666 });
667 }
668
669 /// Emits an event if an event channel is configured.
670 fn emit_event(&self, event: CircuitBreakerEvent) {
671 if let Some(ref tx) = self.event_tx {
672 // Ignore send errors (receiver may have been dropped)
673 let _ = tx.send(event);
674 }
675 }
676}
677
678#[cfg(test)]
679#[allow(clippy::disallowed_methods)] // unwrap() is acceptable in tests
680mod tests {
681 use super::*;
682
683 #[test]
684 fn test_circuit_state_from_u8() {
685 assert_eq!(CircuitState::from_u8(0), CircuitState::Closed);
686 assert_eq!(CircuitState::from_u8(1), CircuitState::Open);
687 assert_eq!(CircuitState::from_u8(2), CircuitState::HalfOpen);
688 assert_eq!(CircuitState::from_u8(255), CircuitState::Closed); // Invalid defaults to Closed
689 }
690
691 #[test]
692 fn test_circuit_breaker_config_default() {
693 let config = CircuitBreakerConfig::default();
694 assert_eq!(config.failure_threshold, 5);
695 assert_eq!(config.reset_timeout, Duration::from_secs(30));
696 assert_eq!(config.success_threshold, 1);
697 }
698
699 #[test]
700 fn test_circuit_breaker_config_new() {
701 let config = CircuitBreakerConfig::new(10, Duration::from_secs(60), 2);
702 assert_eq!(config.failure_threshold, 10);
703 assert_eq!(config.reset_timeout, Duration::from_secs(60));
704 assert_eq!(config.success_threshold, 2);
705 }
706
707 #[test]
708 fn test_circuit_breaker_config_validate_default() {
709 let config = CircuitBreakerConfig::default();
710 let result = config.validate();
711 assert!(result.is_ok());
712 assert!(result.unwrap().warnings.is_empty());
713 }
714
715 #[test]
716 fn test_circuit_breaker_config_validate_zero_failure_threshold() {
717 let config = CircuitBreakerConfig {
718 failure_threshold: 0,
719 ..Default::default()
720 };
721 let result = config.validate();
722 assert!(result.is_err());
723 let err = result.unwrap_err();
724 assert_eq!(err.field_name(), "failure_threshold");
725 }
726
727 #[test]
728 fn test_circuit_breaker_config_validate_zero_success_threshold() {
729 let config = CircuitBreakerConfig {
730 success_threshold: 0,
731 ..Default::default()
732 };
733 let result = config.validate();
734 assert!(result.is_err());
735 let err = result.unwrap_err();
736 assert_eq!(err.field_name(), "success_threshold");
737 }
738
739 #[test]
740 fn test_circuit_breaker_config_validate_short_reset_timeout_warning() {
741 let config = CircuitBreakerConfig {
742 reset_timeout: Duration::from_millis(500),
743 ..Default::default()
744 };
745 let result = config.validate();
746 assert!(result.is_ok());
747 let validation_result = result.unwrap();
748 assert!(!validation_result.warnings.is_empty());
749 assert!(validation_result.warnings[0].contains("reset_timeout"));
750 }
751
752 #[test]
753 fn test_circuit_breaker_initial_state() {
754 let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
755 assert_eq!(breaker.state(), CircuitState::Closed);
756 assert_eq!(breaker.failure_count(), 0);
757 assert_eq!(breaker.success_count(), 0);
758 }
759
760 #[test]
761 fn test_circuit_breaker_allow_request_closed() {
762 let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
763 assert!(breaker.allow_request().is_ok());
764 }
765
766 #[test]
767 fn test_circuit_breaker_record_success_closed() {
768 let breaker = CircuitBreaker::new(CircuitBreakerConfig::default());
769
770 // Record some failures first
771 breaker.record_failure();
772 breaker.record_failure();
773 assert_eq!(breaker.failure_count(), 2);
774
775 // Success should reset failure count
776 breaker.record_success();
777 assert_eq!(breaker.failure_count(), 0);
778 assert_eq!(breaker.state(), CircuitState::Closed);
779 }
780
781 #[test]
782 fn test_circuit_breaker_transition_to_open() {
783 let config = CircuitBreakerConfig {
784 failure_threshold: 3,
785 ..Default::default()
786 };
787 let breaker = CircuitBreaker::new(config);
788
789 // Record failures up to threshold
790 breaker.record_failure();
791 assert_eq!(breaker.state(), CircuitState::Closed);
792 breaker.record_failure();
793 assert_eq!(breaker.state(), CircuitState::Closed);
794 breaker.record_failure();
795
796 // Should transition to Open after reaching threshold
797 assert_eq!(breaker.state(), CircuitState::Open);
798 }
799
800 #[test]
801 fn test_circuit_breaker_reject_when_open() {
802 let config = CircuitBreakerConfig {
803 failure_threshold: 1,
804 reset_timeout: Duration::from_secs(60), // Long timeout
805 ..Default::default()
806 };
807 let breaker = CircuitBreaker::new(config);
808
809 // Open the circuit
810 breaker.record_failure();
811 assert_eq!(breaker.state(), CircuitState::Open);
812
813 // Request should be rejected
814 let result = breaker.allow_request();
815 assert!(result.is_err());
816
817 if let Err(e) = result {
818 assert!(e.as_resource_exhausted().is_some());
819 }
820 }
821
822 #[test]
823 fn test_circuit_breaker_transition_to_half_open() {
824 let config = CircuitBreakerConfig {
825 failure_threshold: 1,
826 reset_timeout: Duration::from_millis(10), // Very short timeout for testing
827 ..Default::default()
828 };
829 let breaker = CircuitBreaker::new(config);
830
831 // Open the circuit
832 breaker.record_failure();
833 assert_eq!(breaker.state(), CircuitState::Open);
834
835 // Wait for reset timeout
836 std::thread::sleep(Duration::from_millis(20));
837
838 // Request should be allowed and state should transition to HalfOpen
839 assert!(breaker.allow_request().is_ok());
840 assert_eq!(breaker.state(), CircuitState::HalfOpen);
841 }
842
843 #[test]
844 fn test_circuit_breaker_half_open_success_closes() {
845 let config = CircuitBreakerConfig {
846 failure_threshold: 1,
847 reset_timeout: Duration::from_millis(10),
848 success_threshold: 1,
849 };
850 let breaker = CircuitBreaker::new(config);
851
852 // Open the circuit
853 breaker.record_failure();
854 assert_eq!(breaker.state(), CircuitState::Open);
855
856 // Wait for reset timeout
857 std::thread::sleep(Duration::from_millis(20));
858
859 // Transition to HalfOpen
860 assert!(breaker.allow_request().is_ok());
861 assert_eq!(breaker.state(), CircuitState::HalfOpen);
862
863 // Success should close the circuit
864 breaker.record_success();
865 assert_eq!(breaker.state(), CircuitState::Closed);
866 assert_eq!(breaker.failure_count(), 0);
867 assert_eq!(breaker.success_count(), 0);
868 }
869
870 #[test]
871 fn test_circuit_breaker_half_open_failure_opens() {
872 let config = CircuitBreakerConfig {
873 failure_threshold: 1,
874 reset_timeout: Duration::from_millis(10),
875 success_threshold: 1,
876 };
877 let breaker = CircuitBreaker::new(config);
878
879 // Open the circuit
880 breaker.record_failure();
881 assert_eq!(breaker.state(), CircuitState::Open);
882
883 // Wait for reset timeout
884 std::thread::sleep(Duration::from_millis(20));
885
886 // Transition to HalfOpen
887 assert!(breaker.allow_request().is_ok());
888 assert_eq!(breaker.state(), CircuitState::HalfOpen);
889
890 // Failure should reopen the circuit
891 breaker.record_failure();
892 assert_eq!(breaker.state(), CircuitState::Open);
893 }
894
895 #[test]
896 fn test_circuit_breaker_multiple_successes_required() {
897 let config = CircuitBreakerConfig {
898 failure_threshold: 1,
899 reset_timeout: Duration::from_millis(10),
900 success_threshold: 3, // Require 3 successes
901 };
902 let breaker = CircuitBreaker::new(config);
903
904 // Open the circuit
905 breaker.record_failure();
906 std::thread::sleep(Duration::from_millis(20));
907 assert!(breaker.allow_request().is_ok());
908 assert_eq!(breaker.state(), CircuitState::HalfOpen);
909
910 // First success - still HalfOpen
911 breaker.record_success();
912 assert_eq!(breaker.state(), CircuitState::HalfOpen);
913 assert_eq!(breaker.success_count(), 1);
914
915 // Second success - still HalfOpen
916 breaker.record_success();
917 assert_eq!(breaker.state(), CircuitState::HalfOpen);
918 assert_eq!(breaker.success_count(), 2);
919
920 // Third success - should close
921 breaker.record_success();
922 assert_eq!(breaker.state(), CircuitState::Closed);
923 }
924
925 #[test]
926 fn test_circuit_breaker_reset() {
927 let config = CircuitBreakerConfig {
928 failure_threshold: 1,
929 ..Default::default()
930 };
931 let breaker = CircuitBreaker::new(config);
932
933 // Open the circuit
934 breaker.record_failure();
935 assert_eq!(breaker.state(), CircuitState::Open);
936
937 // Reset should close the circuit
938 breaker.reset();
939 assert_eq!(breaker.state(), CircuitState::Closed);
940 assert_eq!(breaker.failure_count(), 0);
941 assert_eq!(breaker.success_count(), 0);
942 }
943
944 #[tokio::test]
945 async fn test_circuit_breaker_with_events() {
946 let (tx, mut rx) = mpsc::unbounded_channel::<CircuitBreakerEvent>();
947 let config = CircuitBreakerConfig {
948 failure_threshold: 2,
949 reset_timeout: Duration::from_millis(10),
950 success_threshold: 1,
951 };
952 let breaker = CircuitBreaker::with_events(config, tx);
953
954 // Record failures
955 breaker.record_failure();
956 breaker.record_failure(); // This should trigger state change
957
958 // Check events
959 let event1 = rx.recv().await.unwrap();
960 assert!(matches!(
961 event1,
962 CircuitBreakerEvent::FailureRecorded { count: 1 }
963 ));
964
965 let event2 = rx.recv().await.unwrap();
966 assert!(matches!(
967 event2,
968 CircuitBreakerEvent::FailureRecorded { count: 2 }
969 ));
970
971 let event3 = rx.recv().await.unwrap();
972 assert!(matches!(
973 event3,
974 CircuitBreakerEvent::StateChanged {
975 from: CircuitState::Closed,
976 to: CircuitState::Open
977 }
978 ));
979 }
980
981 #[test]
982 fn test_circuit_breaker_thread_safety() {
983 use std::sync::Arc;
984 use std::thread;
985
986 let breaker = Arc::new(CircuitBreaker::new(CircuitBreakerConfig {
987 failure_threshold: 100,
988 ..Default::default()
989 }));
990
991 let mut handles = vec![];
992
993 // Spawn multiple threads recording failures
994 for _ in 0..10 {
995 let breaker_clone = Arc::clone(&breaker);
996 handles.push(thread::spawn(move || {
997 for _ in 0..10 {
998 breaker_clone.record_failure();
999 }
1000 }));
1001 }
1002
1003 // Wait for all threads to complete
1004 for handle in handles {
1005 handle.join().unwrap();
1006 }
1007
1008 // Should have recorded 100 failures and transitioned to Open
1009 assert_eq!(breaker.state(), CircuitState::Open);
1010 }
1011}