use std::{
sync::Mutex,
time::{Duration, Instant},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub success_threshold: u32,
pub cooldown: Duration,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self { failure_threshold: 5, success_threshold: 2, cooldown: Duration::from_secs(30) }
}
}
#[derive(Debug)]
struct CircuitBreakerInner {
state: CircuitState,
failure_count: u32,
success_count: u32,
last_failure_at: Option<Instant>,
}
#[derive(Debug)]
pub struct CircuitBreaker {
config: CircuitBreakerConfig,
inner: Mutex<CircuitBreakerInner>,
}
impl CircuitBreaker {
#[must_use]
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
config,
inner: Mutex::new(CircuitBreakerInner {
state: CircuitState::Closed,
failure_count: 0,
success_count: 0,
last_failure_at: None,
}),
}
}
pub fn state(&self) -> CircuitState {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
Self::maybe_transition_to_half_open(&mut inner, &self.config);
inner.state
}
pub async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
{
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
Self::maybe_transition_to_half_open(&mut inner, &self.config);
if inner.state == CircuitState::Open {
return Err(CircuitBreakerError::CircuitOpen);
}
}
match f().await {
Ok(value) => {
self.on_success();
Ok(value)
}
Err(err) => {
self.on_failure();
Err(CircuitBreakerError::Inner(err))
}
}
}
pub fn reset(&self) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
inner.state = CircuitState::Closed;
inner.failure_count = 0;
inner.success_count = 0;
inner.last_failure_at = None;
}
fn maybe_transition_to_half_open(
inner: &mut CircuitBreakerInner,
config: &CircuitBreakerConfig,
) {
if inner.state == CircuitState::Open &&
let Some(last_failure) = inner.last_failure_at &&
last_failure.elapsed() >= config.cooldown
{
inner.state = CircuitState::HalfOpen;
inner.success_count = 0;
}
}
fn on_success(&self) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
match inner.state {
CircuitState::HalfOpen => {
inner.success_count = inner.success_count.saturating_add(1);
if inner.success_count >= self.config.success_threshold {
inner.state = CircuitState::Closed;
inner.failure_count = 0;
inner.success_count = 0;
}
}
CircuitState::Closed => {
inner.failure_count = 0;
}
CircuitState::Open => {}
}
}
fn on_failure(&self) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
match inner.state {
CircuitState::Closed | CircuitState::HalfOpen => {
inner.failure_count = inner.failure_count.saturating_add(1);
if inner.failure_count >= self.config.failure_threshold {
inner.state = CircuitState::Open;
inner.last_failure_at = Some(Instant::now());
}
}
CircuitState::Open => {}
}
}
}
pub enum CircuitBreakerError<E> {
CircuitOpen,
Inner(E),
}
impl<E> std::fmt::Debug for CircuitBreakerError<E>
where
E: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CircuitOpen => f.write_str("CircuitOpen"),
Self::Inner(e) => f.debug_tuple("Inner").field(e).finish(),
}
}
}
impl<E: std::fmt::Display> std::fmt::Display for CircuitBreakerError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CircuitOpen => f.write_str("circuit breaker is open"),
Self::Inner(e) => write!(f, "{e}"),
}
}
}
impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for CircuitBreakerError<E> {}
impl<E> CircuitBreakerError<E> {
#[must_use]
pub fn code(&self) -> &'static str {
match self {
Self::CircuitOpen => "BOB_CIRCUIT_OPEN",
Self::Inner(_) => "BOB_CIRCUIT_INNER",
}
}
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
}
}
}
impl RetryConfig {
#[must_use]
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
let base = self.initial_delay.as_millis() as f64;
let scaled = base * self.multiplier.powi(attempt as i32);
let capped = scaled.min(self.max_delay.as_millis() as f64);
Duration::from_millis(capped as u64)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Debug, Clone)]
pub struct ComponentHealth {
pub name: String,
pub status: HealthStatus,
pub detail: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn circuit_breaker_allows_calls_when_closed() {
let cb = CircuitBreaker::new(CircuitBreakerConfig::default());
assert_eq!(cb.state(), CircuitState::Closed);
let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
assert!(result.is_ok());
assert_eq!(result.expect("should succeed"), 42);
assert_eq!(cb.state(), CircuitState::Closed);
}
#[tokio::test]
async fn circuit_breaker_opens_after_threshold() {
let cb = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
success_threshold: 1,
cooldown: Duration::from_secs(60),
});
for _ in 0..3 {
let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
}
assert_eq!(cb.state(), CircuitState::Open);
let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
assert!(matches!(result, Err(CircuitBreakerError::CircuitOpen)));
}
#[tokio::test]
async fn circuit_breaker_half_open_after_cooldown() {
let cb = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 1,
success_threshold: 1,
cooldown: Duration::from_millis(50),
});
let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
assert_eq!(cb.state(), CircuitState::Open);
tokio::time::sleep(Duration::from_millis(80)).await;
assert_eq!(cb.state(), CircuitState::HalfOpen);
}
#[tokio::test]
async fn circuit_breaker_closes_after_success_in_half_open() {
let cb = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 1,
success_threshold: 1,
cooldown: Duration::from_millis(10),
});
let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
tokio::time::sleep(Duration::from_millis(20)).await;
let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
assert!(result.is_ok());
assert_eq!(result.expect("should succeed"), 42);
assert_eq!(cb.state(), CircuitState::Closed);
}
#[tokio::test]
async fn circuit_breaker_reopens_on_failure_in_half_open() {
let cb = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 1,
success_threshold: 2,
cooldown: Duration::from_millis(10),
});
let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(cb.state(), CircuitState::HalfOpen);
let _ = cb.call(|| async { Err::<(), _>("fail again") }).await;
assert_eq!(cb.state(), CircuitState::Open);
}
#[tokio::test]
async fn circuit_breaker_reset() {
let cb = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 1,
success_threshold: 1,
cooldown: Duration::from_secs(60),
});
let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
assert!(cb.call(|| async { Ok::<_, &str>(1) }).await.is_err());
cb.reset();
assert_eq!(cb.state(), CircuitState::Closed);
}
#[test]
fn retry_config_delay_computation() {
let config = RetryConfig {
max_retries: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(5),
multiplier: 2.0,
};
assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
assert_eq!(config.delay_for_attempt(10), Duration::from_secs(5)); }
#[test]
fn retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_retries, 3);
assert_eq!(config.delay_for_attempt(0), Duration::from_millis(200));
}
}