use crate::error::{CoreError, CoreResult, ErrorContext};
use crate::random::Rng;
use std::collections::HashMap;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
impl fmt::Display for CircuitState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CircuitState::Closed => write!(f, "closed"),
CircuitState::Open => write!(f, "open"),
CircuitState::HalfOpen => write!(f, "half-open"),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: usize,
pub failure_window: Duration,
pub recoverytimeout: Duration,
pub success_threshold: usize,
pub max_half_open_requests: usize,
pub minimum_request_threshold: usize,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
failure_window: Duration::from_secs(60),
recoverytimeout: Duration::from_secs(30),
success_threshold: 3,
max_half_open_requests: 2,
minimum_request_threshold: 10,
}
}
}
pub struct CircuitBreaker {
name: String,
state: RwLock<CircuitState>,
config: CircuitBreakerConfig,
failure_count: AtomicUsize,
success_count: AtomicUsize,
request_count: AtomicUsize,
half_open_requests: AtomicUsize,
last_failure_time: Mutex<Option<Instant>>,
last_state_change: Mutex<Instant>,
failure_history: Mutex<Vec<Instant>>,
}
impl CircuitBreaker {
pub fn new(name: String) -> Self {
Self::with_config(name, CircuitBreakerConfig::default())
}
pub fn with_config(name: String, config: CircuitBreakerConfig) -> Self {
Self {
name,
state: RwLock::new(CircuitState::Closed),
config,
failure_count: AtomicUsize::new(0),
success_count: AtomicUsize::new(0),
request_count: AtomicUsize::new(0),
half_open_requests: AtomicUsize::new(0),
last_failure_time: Mutex::new(None),
last_state_change: Mutex::new(Instant::now()),
failure_history: Mutex::new(Vec::new()),
}
}
pub fn execute<F, T>(&self, operation: F) -> CoreResult<T>
where
F: FnOnce() -> CoreResult<T>,
{
if !self.should_allow_request()? {
return Err(CoreError::ComputationError(ErrorContext::new(format!(
"Circuit breaker '{name}' is open - rejecting request",
name = self.name
))));
}
self.request_count.fetch_add(1, Ordering::Relaxed);
let is_half_open = {
let state = self.state.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new(
"Failed to read circuit breaker state",
))
})?;
*state == CircuitState::HalfOpen
};
if is_half_open {
self.half_open_requests.fetch_add(1, Ordering::Relaxed);
}
let result = operation();
match &result {
Ok(_) => self.record_success()?,
Err(_) => self.record_failure()?,
}
if is_half_open {
self.half_open_requests.fetch_sub(1, Ordering::Relaxed);
}
result
}
fn should_allow_request(&self) -> CoreResult<bool> {
let state = self.state.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker state"))
})?;
match *state {
CircuitState::Closed => Ok(true),
CircuitState::Open => {
if let Ok(last_change) = self.last_state_change.lock() {
if last_change.elapsed() >= self.config.recoverytimeout {
drop(state); self.transition_to_half_open()?;
return Ok(true);
}
}
Ok(false)
}
CircuitState::HalfOpen => {
let current_half_open = self.half_open_requests.load(Ordering::Relaxed);
Ok(current_half_open < self.config.max_half_open_requests)
}
}
}
fn record_success(&self) -> CoreResult<()> {
let state = self.state.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker state"))
})?;
match *state {
CircuitState::Closed => {
self.failure_count.store(0, Ordering::Relaxed);
}
CircuitState::HalfOpen => {
let success_count = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
if success_count >= self.config.success_threshold {
drop(state); self.transition_to_closed()?;
}
}
CircuitState::Open => {
}
}
Ok(())
}
fn record_failure(&self) -> CoreResult<()> {
{
let mut history = self.failure_history.lock().map_err(|_| {
CoreError::ComputationError(ErrorContext::new(
"Failed to acquire failure history lock",
))
})?;
let now = Instant::now();
history.push(now);
let cutoff = now - self.config.failure_window;
history.retain(|&failure_time| failure_time > cutoff);
}
self.failure_count.fetch_add(1, Ordering::Relaxed);
if let Ok(mut last_failure) = self.last_failure_time.lock() {
*last_failure = Some(Instant::now());
}
let state = self.state.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker state"))
})?;
match *state {
CircuitState::Closed => {
if self.should_open_circuit()? {
drop(state); self.transition_to_open()?;
}
}
CircuitState::HalfOpen => {
drop(state); self.transition_to_open()?;
}
CircuitState::Open => {
}
}
Ok(())
}
fn should_open_circuit(&self) -> CoreResult<bool> {
let request_count = self.request_count.load(Ordering::Relaxed);
if request_count < self.config.minimum_request_threshold {
return Ok(false);
}
let history = self.failure_history.lock().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to acquire failure history lock"))
})?;
let recent_failures = history.len();
Ok(recent_failures >= self.config.failure_threshold)
}
fn transition_to_open(&self) -> CoreResult<()> {
let mut state = self.state.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to write circuit breaker state"))
})?;
*state = CircuitState::Open;
if let Ok(mut last_change) = self.last_state_change.lock() {
*last_change = Instant::now();
}
eprintln!(
"Circuit breaker '{name}' opened due to failures",
name = self.name
);
Ok(())
}
fn transition_to_half_open(&self) -> CoreResult<()> {
let mut state = self.state.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to write circuit breaker state"))
})?;
*state = CircuitState::HalfOpen;
self.success_count.store(0, Ordering::Relaxed);
if let Ok(mut last_change) = self.last_state_change.lock() {
*last_change = Instant::now();
}
println!(
"Circuit breaker '{name}' moved to half-open state",
name = self.name
);
Ok(())
}
fn transition_to_closed(&self) -> CoreResult<()> {
let mut state = self.state.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to write circuit breaker state"))
})?;
*state = CircuitState::Closed;
self.failure_count.store(0, Ordering::Relaxed);
self.success_count.store(0, Ordering::Relaxed);
if let Ok(mut last_change) = self.last_state_change.lock() {
*last_change = Instant::now();
}
if let Ok(mut history) = self.failure_history.lock() {
history.clear();
}
println!(
"Circuit breaker '{name}' closed - service recovered",
name = self.name
);
Ok(())
}
pub fn status(&self) -> CoreResult<CircuitBreakerStatus> {
let state = self.state.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker state"))
})?;
let failure_count = self.failure_count.load(Ordering::Relaxed);
let success_count = self.success_count.load(Ordering::Relaxed);
let request_count = self.request_count.load(Ordering::Relaxed);
let half_open_requests = self.half_open_requests.load(Ordering::Relaxed);
let last_failure_time = self
.last_failure_time
.lock()
.map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read last failure time"))
})?
.map(|instant| SystemTime::now() - instant.elapsed());
let last_state_change = self
.last_state_change
.lock()
.map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read last state change"))
})?
.elapsed();
Ok(CircuitBreakerStatus {
name: self.name.clone(),
state: state.clone(),
failure_count,
success_count,
request_count,
half_open_requests,
last_failure_time,
last_state_change,
})
}
pub fn reset(&self) -> CoreResult<()> {
let mut state = self.state.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to write circuit breaker state"))
})?;
*state = CircuitState::Closed;
self.failure_count.store(0, Ordering::Relaxed);
self.success_count.store(0, Ordering::Relaxed);
self.request_count.store(0, Ordering::Relaxed);
self.half_open_requests.store(0, Ordering::Relaxed);
if let Ok(mut last_failure) = self.last_failure_time.lock() {
*last_failure = None;
}
if let Ok(mut last_change) = self.last_state_change.lock() {
*last_change = Instant::now();
}
if let Ok(mut history) = self.failure_history.lock() {
history.clear();
}
println!("Circuit breaker '{name}' manually reset", name = self.name);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerStatus {
pub name: String,
pub state: CircuitState,
pub failure_count: usize,
pub success_count: usize,
pub request_count: usize,
pub half_open_requests: usize,
pub last_failure_time: Option<SystemTime>,
pub last_state_change: Duration,
}
impl fmt::Display for CircuitBreakerStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Circuit Breaker: {}", self.name)?;
writeln!(f, " State: {}", self.state)?;
writeln!(f, " Failures: {}", self.failure_count)?;
writeln!(f, " Successes: {}", self.success_count)?;
writeln!(f, " Total Requests: {}", self.request_count)?;
writeln!(f, " Half-open Requests: {}", self.half_open_requests)?;
if let Some(last_failure) = self.last_failure_time {
writeln!(f, " Last Failure: {last_failure:?}")?;
}
writeln!(f, " Last State Change: {:?} ago", self.last_state_change)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: usize,
pub basedelay: Duration,
pub maxdelay: Duration,
pub backoff_multiplier: f64,
pub jitter: f64,
pub retry_on: Vec<String>,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
basedelay: Duration::from_millis(100),
maxdelay: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter: 0.1,
retry_on: vec![
"ComputationError".to_string(),
"TimeoutError".to_string(),
"IoError".to_string(),
],
}
}
}
pub struct RetryExecutor {
policy: RetryPolicy,
}
impl RetryExecutor {
pub fn policy(policy: RetryPolicy) -> Self {
Self { policy }
}
pub fn execute<F, T>(&self, operation: F) -> CoreResult<T>
where
F: Fn() -> CoreResult<T>,
{
let mut lasterror = None;
for attempt in 0..self.policy.max_attempts {
match operation() {
Ok(result) => return Ok(result),
Err(error) => {
if !self.should_retry(&error) {
return Err(error);
}
lasterror = Some(error);
if attempt < self.policy.max_attempts - 1 {
let delay = self.calculatedelay(attempt);
std::thread::sleep(delay);
}
}
}
}
Err(lasterror.unwrap_or_else(|| {
CoreError::ComputationError(ErrorContext::new("All retry attempts failed"))
}))
}
pub async fn execute_async<F, Fut, T>(&self, operation: F) -> CoreResult<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = CoreResult<T>>,
{
let mut lasterror = None;
for attempt in 0..self.policy.max_attempts {
match operation().await {
Ok(result) => return Ok(result),
Err(error) => {
if !self.should_retry(&error) {
return Err(error);
}
lasterror = Some(error);
if attempt < self.policy.max_attempts - 1 {
let delay = self.calculatedelay(attempt);
#[cfg(feature = "async")]
tokio::time::sleep(delay).await;
#[cfg(not(feature = "async"))]
let _unuseddelay = delay; }
}
}
}
Err(lasterror.unwrap_or_else(|| {
CoreError::ComputationError(ErrorContext::new("All retry attempts failed"))
}))
}
fn should_retry(&self, error: &CoreError) -> bool {
let errortype = match error {
CoreError::ComputationError(_) => "ComputationError",
CoreError::TimeoutError(_) => "TimeoutError",
CoreError::IoError(_) => "IoError",
CoreError::MemoryError(_) => return false, _ => return false, };
self.policy.retry_on.contains(&errortype.to_string())
}
fn calculatedelay(&self, attempt: usize) -> Duration {
let mut rng = rand::rng();
let basedelay_ms = self.policy.basedelay.as_millis() as f64;
let exponentialdelay = basedelay_ms * self.policy.backoff_multiplier.powi(attempt as i32);
let jitter_range = exponentialdelay * self.policy.jitter;
let jitter = (rng.random::<f64>() - 0.5) * 2.0 * jitter_range;
let delay_with_jitter = exponentialdelay + jitter;
let finaldelay = delay_with_jitter.min(self.policy.maxdelay.as_millis() as f64);
Duration::from_millis(finaldelay.max(0.0) as u64)
}
}
pub trait FallbackStrategy<T>: Send + Sync {
fn error(&self, error: &CoreError) -> CoreResult<T>;
fn name(&self) -> &str;
}
pub struct DefaultValueFallback<T> {
defaultvalue: T,
name: String,
}
impl<T: Clone> DefaultValueFallback<T> {
pub fn value(defaultvalue: T, name: String) -> Self {
Self {
defaultvalue,
name,
}
}
}
impl<T: Clone + Send + Sync> FallbackStrategy<T> for DefaultValueFallback<T> {
fn error(&self, error: &CoreError) -> CoreResult<T> {
Ok(self.defaultvalue.clone())
}
fn name(&self) -> &str {
&self.name
}
}
pub struct ResilientExecutor<T> {
circuitbreaker: Option<Arc<CircuitBreaker>>,
retry_executor: Option<RetryExecutor>,
fallback_strategy: Option<Box<dyn FallbackStrategy<T>>>,
}
impl<T> ResilientExecutor<T> {
pub fn new() -> Self {
Self {
circuitbreaker: None,
retry_executor: None,
fallback_strategy: None,
}
}
pub fn with_circuitbreaker(mut self, circuitbreaker: Arc<CircuitBreaker>) -> Self {
self.circuitbreaker = Some(circuitbreaker);
self
}
pub fn with_retrypolicy(mut self, retrypolicy: RetryPolicy) -> Self {
self.retry_executor = Some(RetryExecutor::policy(retrypolicy));
self
}
pub fn with_fallback(mut self, fallback: Box<dyn FallbackStrategy<T>>) -> Self {
self.fallback_strategy = Some(fallback);
self
}
pub fn execute<F>(&self, operation: F) -> CoreResult<T>
where
F: Fn() -> CoreResult<T> + Clone,
{
let final_operation = || {
if let Some(cb) = &self.circuitbreaker {
cb.execute(operation.clone())
} else {
operation()
}
};
let result = if let Some(retry) = &self.retry_executor {
retry.execute(final_operation)
} else {
final_operation()
};
match result {
Ok(value) => Ok(value),
Err(error) => {
if let Some(fallback) = &self.fallback_strategy {
fallback.error(&error)
} else {
Err(error)
}
}
}
}
}
impl<T> Default for ResilientExecutor<T> {
fn default() -> Self {
Self::new()
}
}
static CIRCUIT_BREAKER_REGISTRY: std::sync::LazyLock<RwLock<HashMap<String, Arc<CircuitBreaker>>>> =
std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
#[allow(dead_code)]
pub fn get_circuitbreaker(name: &str) -> CoreResult<Arc<CircuitBreaker>> {
let registry = CIRCUIT_BREAKER_REGISTRY.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker registry"))
})?;
if let Some(cb) = registry.get(name) {
return Ok(cb.clone());
}
drop(registry);
let mut registry = CIRCUIT_BREAKER_REGISTRY.write().map_err(|_| {
CoreError::ComputationError(ErrorContext::new(
"Failed to write circuit breaker registry",
))
})?;
if let Some(cb) = registry.get(name) {
return Ok(cb.clone());
}
let circuitbreaker = Arc::new(CircuitBreaker::new(name.to_string()));
registry.insert(name.to_string(), circuitbreaker.clone());
Ok(circuitbreaker)
}
#[allow(dead_code)]
pub fn list_circuitbreakers() -> CoreResult<Vec<CircuitBreakerStatus>> {
let registry = CIRCUIT_BREAKER_REGISTRY.read().map_err(|_| {
CoreError::ComputationError(ErrorContext::new("Failed to read circuit breaker registry"))
})?;
let mut statuses = Vec::new();
for cb in registry.values() {
statuses.push(cb.status()?);
}
Ok(statuses)
}
#[macro_export]
macro_rules! with_circuitbreaker {
($name:expr, $operation:expr) => {{
let cb = $crate::error::circuitbreaker::get_circuitbreaker($name)?;
cb.execute(|| $operation)
}};
}
#[macro_export]
macro_rules! with_retry {
($operation:expr) => {{
let retry_executor = $crate::error::circuitbreaker::RetryExecutor::policy(
$crate::error::circuitbreaker::RetryPolicy::default(),
);
retry_executor.execute(|| $operation)
}};
($policy:expr, $operation:expr) => {{
let retry_executor = $crate::error::circuitbreaker::RetryExecutor::policy($policy);
retry_executor.execute(|| $operation)
}};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_circuitbreaker_states() {
let cb = CircuitBreaker::new("test".to_string());
let status = cb.status().expect("Operation failed");
assert_eq!(status.state, CircuitState::Closed);
for _ in 0..10 {
let _ = cb.execute(|| -> CoreResult<()> {
Err(CoreError::ComputationError(ErrorContext::new("test error")))
});
}
let status = cb.status().expect("Operation failed");
assert_eq!(status.state, CircuitState::Open);
}
#[test]
fn test_circuitbreaker_retry_executor() {
let policy = RetryPolicy {
max_attempts: 3,
basedelay: Duration::from_millis(1), ..Default::default()
};
let retry_executor = RetryExecutor::policy(policy);
let mut attempt_count = 0;
let result = std::cell::RefCell::new(attempt_count);
let execute_result = retry_executor.execute(|| {
let mut count = result.borrow_mut();
*count += 1;
if *count < 3 {
Err(CoreError::ComputationError(ErrorContext::new("retry test")))
} else {
Ok("success")
}
});
attempt_count = *result.borrow();
assert!(execute_result.is_ok());
assert_eq!(execute_result.expect("Operation failed"), "success");
assert_eq!(attempt_count, 3);
}
#[test]
fn test_fallback_strategy() {
let fallback = DefaultValueFallback::value(42, "test_fallback".to_string());
let error = CoreError::ComputationError(ErrorContext::new("test error"));
let result = fallback.error(&error).expect("Operation failed");
assert_eq!(result, 42);
}
#[test]
fn test_resilient_executor() {
let cb = Arc::new(CircuitBreaker::new("test".to_string()));
let fallback = Box::new(DefaultValueFallback::value("fallback", "test".to_string()));
let executor = ResilientExecutor::new()
.with_circuitbreaker(cb)
.with_fallback(fallback);
let result = executor.execute(|| -> CoreResult<&str> {
Err(CoreError::ComputationError(ErrorContext::new("test error")))
});
assert!(result.is_ok());
assert_eq!(result.expect("Operation failed"), "fallback");
}
#[test]
fn test_circuitbreaker_registry() {
let cb1 = get_circuitbreaker("test1").expect("Operation failed");
let cb2 = get_circuitbreaker("test1").expect("Operation failed");
assert!(Arc::ptr_eq(&cb1, &cb2));
let cb3 = get_circuitbreaker("test2").expect("Operation failed"); assert!(!Arc::ptr_eq(&cb1, &cb3));
}
}