pub mod retry_policy;
pub mod circuit_breaker;
pub mod types;
pub use retry_policy::{
RetryPolicy, RetryStrategy, ExponentialBackoffConfig, LinearBackoffConfig, FixedDelayConfig
};
pub use circuit_breaker::{
CircuitBreakerPolicy, CircuitBreakerState, CircuitBreakerStatus
};
pub use types::{
ErrorType, RecoveryStats, RecoveryResult, RecoveryError
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq)]
pub struct RecoveryConfig {
pub retry_policy: RetryPolicy,
pub circuit_breaker: CircuitBreakerPolicy,
}
impl Default for RecoveryConfig {
fn default() -> Self {
Self {
retry_policy: RetryPolicy::default(),
circuit_breaker: CircuitBreakerPolicy::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct ErrorRecovery {
policy: RetryPolicy,
circuit_breaker: Arc<RwLock<CircuitBreakerPolicy>>,
stats: Arc<RwLock<RecoveryStats>>,
initialized: bool,
}
impl ErrorRecovery {
pub fn new() -> Self {
Self {
policy: RetryPolicy::default(),
circuit_breaker: Arc::new(RwLock::new(CircuitBreakerPolicy::new())),
stats: Arc::new(RwLock::new(RecoveryStats::new())),
initialized: false,
}
}
pub fn with_config(config: RecoveryConfig) -> Self {
Self {
policy: config.retry_policy,
circuit_breaker: Arc::new(RwLock::new(config.circuit_breaker)),
stats: Arc::new(RwLock::new(RecoveryStats::new())),
initialized: false,
}
}
pub async fn initialize(&mut self) -> Result<(), RecoveryError> {
if self.initialized {
return Err(RecoveryError::NotInitialized);
}
{
let mut circuit_breaker = self.circuit_breaker.write().await;
let mut status = circuit_breaker.get_status();
circuit_breaker.reset(&mut status);
}
{
let mut stats = self.stats.write().await;
stats.reset();
}
self.initialized = true;
Ok(())
}
pub async fn shutdown(&mut self) -> Result<(), RecoveryError> {
if !self.initialized {
return Err(RecoveryError::NotInitialized);
}
self.initialized = false;
Ok(())
}
pub async fn execute_with_recovery<F, T, E>(
&self,
operation: F,
) -> Result<RecoveryResult<T>, RecoveryError>
where
F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send + 'static>> + Send + Sync,
E: std::fmt::Display + Send + 'static,
{
if !self.initialized {
return Err(RecoveryError::NotInitialized);
}
let start_time = Instant::now();
let mut retry_count = 0;
{
let mut stats = self.stats.write().await;
stats.record_attempt();
}
loop {
{
let circuit_breaker = self.circuit_breaker.read().await;
let status = circuit_breaker.get_status();
if !circuit_breaker.should_allow_request(&status) {
let mut stats = self.stats.write().await;
stats.record_circuit_breaker_trip();
return Ok(RecoveryResult::failure(
"Circuit breaker is open".to_string(),
retry_count,
start_time.elapsed(),
));
}
}
let result = operation().await;
match result {
Ok(value) => {
{
let mut circuit_breaker = self.circuit_breaker.write().await;
let mut status = circuit_breaker.get_status();
circuit_breaker.update_state(&mut status, true);
}
{
let mut stats = self.stats.write().await;
stats.record_success(retry_count, start_time.elapsed());
}
return Ok(RecoveryResult::success(value, retry_count, start_time.elapsed()));
}
Err(error) => {
{
let mut circuit_breaker = self.circuit_breaker.write().await;
let mut status = circuit_breaker.get_status();
circuit_breaker.update_state(&mut status, false);
}
if !self.policy.should_retry(retry_count) {
let mut stats = self.stats.write().await;
stats.record_failure(retry_count, start_time.elapsed(), ErrorType::Unknown);
return Ok(RecoveryResult::failure(
format!("Max retries exceeded: {}", error),
retry_count,
start_time.elapsed(),
));
}
let delay = self.policy.calculate_delay(retry_count + 1);
if delay > Duration::from_millis(0) {
tokio::time::sleep(delay).await;
}
retry_count += 1;
}
}
}
}
pub async fn get_stats(&self) -> RecoveryStats {
self.stats.read().await.clone()
}
pub async fn get_circuit_breaker_status(&self) -> CircuitBreakerStatus {
let circuit_breaker = self.circuit_breaker.read().await;
circuit_breaker.get_status()
}
pub async fn reset_circuit_breaker(&self) -> Result<(), RecoveryError> {
if !self.initialized {
return Err(RecoveryError::NotInitialized);
}
let mut circuit_breaker = self.circuit_breaker.write().await;
let mut status = circuit_breaker.get_status();
circuit_breaker.reset(&mut status);
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.initialized
}
pub fn retry_policy(&self) -> &RetryPolicy {
&self.policy
}
pub async fn circuit_breaker_policy(&self) -> CircuitBreakerPolicy {
self.circuit_breaker.read().await.clone()
}
}
impl Default for ErrorRecovery {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
#[tokio::test]
async fn test_error_recovery_creation() {
let mut recovery = ErrorRecovery::new();
assert!(!recovery.is_initialized());
recovery.initialize().await.unwrap();
assert!(recovery.is_initialized());
}
#[tokio::test]
async fn test_error_recovery_with_config() {
let config = RecoveryConfig::default();
let mut recovery = ErrorRecovery::with_config(config);
recovery.initialize().await.unwrap();
assert!(recovery.is_initialized());
}
#[tokio::test]
async fn test_error_recovery_initialization() {
let mut recovery = ErrorRecovery::new();
let result = recovery.initialize().await;
assert!(result.is_ok());
assert!(recovery.is_initialized());
let result = recovery.initialize().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_error_recovery_shutdown() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
assert!(recovery.is_initialized());
let result = recovery.shutdown().await;
assert!(result.is_ok());
assert!(!recovery.is_initialized());
}
#[tokio::test]
async fn test_execute_with_recovery_success() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
let operation = || -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send + 'static>> {
Box::pin(async {
Ok::<String, String>("success".to_string())
})
};
let result = recovery.execute_with_recovery(operation).await.unwrap();
assert!(result.is_success());
assert_eq!(result.clone().unwrap(), "success");
assert_eq!(result.retry_count, 0);
}
#[tokio::test]
async fn test_execute_with_recovery_failure() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
let operation = || -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send + 'static>> {
Box::pin(async {
Err::<String, String>("operation failed".to_string())
})
};
let result = recovery.execute_with_recovery(operation).await.unwrap();
assert!(result.is_failure());
assert!(result.error_message.unwrap().contains("Max retries exceeded"));
assert_eq!(result.retry_count, 3); }
#[tokio::test]
async fn test_execute_with_recovery_before_initialization() {
let recovery = ErrorRecovery::new();
let operation = || -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send + 'static>> {
Box::pin(async {
Ok::<String, String>("success".to_string())
})
};
let result = recovery.execute_with_recovery(operation).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_stats() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
let operation = || -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send + 'static>> {
Box::pin(async {
Ok::<String, String>("success".to_string())
})
};
recovery.execute_with_recovery(operation).await.unwrap();
let stats = recovery.get_stats().await;
assert_eq!(stats.total_attempts, 1);
assert_eq!(stats.successful_recoveries, 1);
assert_eq!(stats.failed_recoveries, 0);
}
#[tokio::test]
async fn test_circuit_breaker_status() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
let status = recovery.get_circuit_breaker_status().await;
assert_eq!(status.state, CircuitBreakerState::Closed);
assert_eq!(status.failure_count, 0);
assert_eq!(status.success_count, 0);
}
#[tokio::test]
async fn test_reset_circuit_breaker() {
let mut recovery = ErrorRecovery::new();
recovery.initialize().await.unwrap();
let result = recovery.reset_circuit_breaker().await;
assert!(result.is_ok());
let status = recovery.get_circuit_breaker_status().await;
assert_eq!(status.state, CircuitBreakerState::Closed);
}
#[tokio::test]
async fn test_reset_circuit_breaker_before_initialization() {
let recovery = ErrorRecovery::new();
let result = recovery.reset_circuit_breaker().await;
assert!(result.is_err());
}
}