leptos_sync_core/reliability/
circuit_breaker.rs

1//! Circuit Breaker System
2//!
3//! This module provides circuit breaker functionality for fault tolerance including:
4//! - Circuit breaker pattern implementation
5//! - Configurable failure thresholds
6//! - Automatic recovery mechanisms
7//! - Circuit state monitoring
8
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use serde::{Deserialize, Serialize};
13
14/// Circuit breaker for fault tolerance
15#[derive(Debug, Clone)]
16pub struct CircuitBreaker {
17    /// Circuit breaker state
18    state: Arc<RwLock<CircuitBreakerState>>,
19    /// Circuit breaker configuration
20    config: BreakerConfig,
21    /// Whether the system is initialized
22    initialized: bool,
23}
24
25impl CircuitBreaker {
26    /// Create a new circuit breaker
27    pub fn new() -> Self {
28        Self {
29            state: Arc::new(RwLock::new(CircuitBreakerState::new())),
30            config: BreakerConfig::default(),
31            initialized: false,
32        }
33    }
34    
35    /// Create a new circuit breaker with configuration
36    pub fn with_config(config: BreakerConfig) -> Self {
37        Self {
38            state: Arc::new(RwLock::new(CircuitBreakerState::new())),
39            config,
40            initialized: false,
41        }
42    }
43    
44    /// Initialize the circuit breaker
45    pub async fn initialize(&mut self) -> Result<(), BreakerError> {
46        let mut state = self.state.write().await;
47        state.reset();
48        self.initialized = true;
49        Ok(())
50    }
51    
52    /// Shutdown the circuit breaker
53    pub async fn shutdown(&mut self) -> Result<(), BreakerError> {
54        self.initialized = false;
55        Ok(())
56    }
57    
58    /// Check if the system is initialized
59    pub fn is_initialized(&self) -> bool {
60        self.initialized
61    }
62    
63    /// Check if the circuit breaker allows execution
64    pub async fn can_execute(&self) -> Result<bool, BreakerError> {
65        if !self.initialized {
66            return Err(BreakerError::NotInitialized);
67        }
68        
69        let state = self.state.read().await;
70        Ok(state.can_execute())
71    }
72    
73    /// Execute an operation through the circuit breaker
74    pub async fn execute<F, T, E>(&self, operation: F) -> Result<T, BreakerError>
75    where
76        F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
77        E: std::error::Error + Send + Sync + 'static,
78    {
79        if !self.initialized {
80            return Err(BreakerError::NotInitialized);
81        }
82        
83        // Check if we can execute
84        if !self.can_execute().await? {
85            return Err(BreakerError::CircuitOpen);
86        }
87        
88        // Execute the operation
89        let result = operation().await;
90        
91        // Update circuit breaker state based on result
92        let mut state = self.state.write().await;
93        match result {
94            Ok(_) => {
95                state.record_success();
96                Ok(result.unwrap())
97            }
98            Err(_) => {
99                state.record_failure();
100                Err(BreakerError::OperationFailed)
101            }
102        }
103    }
104    
105    /// Record a successful operation
106    pub async fn record_success(&self) -> Result<(), BreakerError> {
107        if !self.initialized {
108            return Err(BreakerError::NotInitialized);
109        }
110        
111        let mut state = self.state.write().await;
112        state.record_success();
113        Ok(())
114    }
115    
116    /// Record a failed operation
117    pub async fn record_failure(&self) -> Result<(), BreakerError> {
118        if !self.initialized {
119            return Err(BreakerError::NotInitialized);
120        }
121        
122        let mut state = self.state.write().await;
123        state.record_failure();
124        Ok(())
125    }
126    
127    /// Get the current circuit breaker state
128    pub async fn get_state(&self) -> CircuitState {
129        let state = self.state.read().await;
130        state.get_state()
131    }
132    
133    /// Get circuit breaker status
134    pub async fn get_status(&self) -> Result<CircuitBreakerStatus, BreakerError> {
135        if !self.initialized {
136            return Err(BreakerError::NotInitialized);
137        }
138        
139        let state = self.state.read().await;
140        Ok(state.get_status())
141    }
142    
143    /// Reset the circuit breaker
144    pub async fn reset(&self) -> Result<(), BreakerError> {
145        if !self.initialized {
146            return Err(BreakerError::NotInitialized);
147        }
148        
149        let mut state = self.state.write().await;
150        state.reset();
151        Ok(())
152    }
153}
154
155/// Circuit breaker state
156#[derive(Debug, Clone)]
157struct CircuitBreakerState {
158    /// Current state
159    state: CircuitState,
160    /// Number of consecutive failures
161    failure_count: usize,
162    /// Number of consecutive successes
163    success_count: usize,
164    /// Last failure time
165    last_failure_time: Option<Instant>,
166    /// Last success time
167    last_success_time: Option<Instant>,
168}
169
170impl CircuitBreakerState {
171    /// Create a new circuit breaker state
172    fn new() -> Self {
173        Self {
174            state: CircuitState::Closed,
175            failure_count: 0,
176            success_count: 0,
177            last_failure_time: None,
178            last_success_time: None,
179        }
180    }
181    
182    /// Check if the circuit breaker allows execution
183    fn can_execute(&self) -> bool {
184        match self.state {
185            CircuitState::Closed => true,
186            CircuitState::Open => {
187                if let Some(last_failure) = self.last_failure_time {
188                    last_failure.elapsed() >= Duration::from_secs(60) // 1 minute timeout
189                } else {
190                    false
191                }
192            }
193            CircuitState::HalfOpen => true,
194        }
195    }
196    
197    /// Record a successful operation
198    fn record_success(&mut self) {
199        self.success_count += 1;
200        self.failure_count = 0;
201        self.last_success_time = Some(Instant::now());
202        
203        if self.state == CircuitState::HalfOpen && self.success_count >= 3 {
204            self.state = CircuitState::Closed;
205            self.success_count = 0;
206        }
207    }
208    
209    /// Record a failed operation
210    fn record_failure(&mut self) {
211        self.failure_count += 1;
212        self.success_count = 0;
213        self.last_failure_time = Some(Instant::now());
214        
215        if self.failure_count >= 5 {
216            self.state = CircuitState::Open;
217        } else if self.state == CircuitState::HalfOpen {
218            self.state = CircuitState::Open;
219        }
220    }
221    
222    /// Get the current state
223    fn get_state(&self) -> CircuitState {
224        self.state.clone()
225    }
226    
227    /// Get circuit breaker status
228    fn get_status(&self) -> CircuitBreakerStatus {
229        CircuitBreakerStatus {
230            state: self.state.clone(),
231            failure_count: self.failure_count,
232            success_count: self.success_count,
233            last_failure_time: self.last_failure_time,
234            last_success_time: self.last_success_time,
235        }
236    }
237    
238    /// Reset the circuit breaker state
239    fn reset(&mut self) {
240        self.state = CircuitState::Closed;
241        self.failure_count = 0;
242        self.success_count = 0;
243        self.last_failure_time = None;
244        self.last_success_time = None;
245    }
246}
247
248/// Circuit breaker states
249#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
250pub enum CircuitState {
251    /// Circuit is closed - operations are allowed
252    Closed,
253    /// Circuit is open - operations are blocked
254    Open,
255    /// Circuit is half-open - testing if service is recovered
256    HalfOpen,
257}
258
259/// Circuit breaker status
260#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
261pub struct CircuitBreakerStatus {
262    /// Current state
263    pub state: CircuitState,
264    /// Number of consecutive failures
265    pub failure_count: usize,
266    /// Number of consecutive successes
267    pub success_count: usize,
268    /// Last failure time (serialized as timestamp)
269    #[serde(skip_serializing, skip_deserializing)]
270    pub last_failure_time: Option<Instant>,
271    /// Last success time (serialized as timestamp)
272    #[serde(skip_serializing, skip_deserializing)]
273    pub last_success_time: Option<Instant>,
274}
275
276/// Circuit breaker configuration
277#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
278pub struct BreakerConfig {
279    /// Failure threshold to open circuit
280    pub failure_threshold: usize,
281    /// Success threshold to close circuit
282    pub success_threshold: usize,
283    /// Timeout for half-open state
284    pub timeout: Duration,
285    /// Enable circuit breaker
286    pub enabled: bool,
287}
288
289impl Default for BreakerConfig {
290    fn default() -> Self {
291        Self {
292            failure_threshold: 5,
293            success_threshold: 3,
294            timeout: Duration::from_secs(60),
295            enabled: true,
296        }
297    }
298}
299
300/// Circuit breaker errors
301#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
302pub enum BreakerError {
303    /// System not initialized
304    NotInitialized,
305    /// Circuit breaker is open
306    CircuitOpen,
307    /// Operation failed
308    OperationFailed,
309    /// Configuration error
310    ConfigurationError(String),
311}
312
313impl std::fmt::Display for BreakerError {
314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315        match self {
316            BreakerError::NotInitialized => write!(f, "Circuit breaker not initialized"),
317            BreakerError::CircuitOpen => write!(f, "Circuit breaker is open"),
318            BreakerError::OperationFailed => write!(f, "Operation failed"),
319            BreakerError::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
320        }
321    }
322}
323
324impl std::error::Error for BreakerError {}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    
330    #[tokio::test]
331    async fn test_circuit_breaker_creation() {
332        let breaker = CircuitBreaker::new();
333        assert!(!breaker.is_initialized());
334    }
335    
336    #[tokio::test]
337    async fn test_circuit_breaker_initialization() {
338        let mut breaker = CircuitBreaker::new();
339        let result = breaker.initialize().await;
340        assert!(result.is_ok());
341        assert!(breaker.is_initialized());
342    }
343    
344    #[tokio::test]
345    async fn test_circuit_breaker_shutdown() {
346        let mut breaker = CircuitBreaker::new();
347        breaker.initialize().await.unwrap();
348        let result = breaker.shutdown().await;
349        assert!(result.is_ok());
350        assert!(!breaker.is_initialized());
351    }
352    
353    #[tokio::test]
354    async fn test_circuit_breaker_can_execute() {
355        let mut breaker = CircuitBreaker::new();
356        breaker.initialize().await.unwrap();
357        
358        let can_execute = breaker.can_execute().await.unwrap();
359        assert!(can_execute);
360    }
361    
362    #[tokio::test]
363    async fn test_circuit_breaker_success() {
364        let mut breaker = CircuitBreaker::new();
365        breaker.initialize().await.unwrap();
366        
367        let result = breaker.execute(|| {
368            Box::pin(async { Ok::<i32, std::io::Error>(42) })
369        }).await;
370        
371        assert!(result.is_ok());
372        assert_eq!(result.unwrap(), 42);
373        
374        let status = breaker.get_status().await.unwrap();
375        assert_eq!(status.state, CircuitState::Closed);
376        assert_eq!(status.failure_count, 0);
377        assert_eq!(status.success_count, 1);
378    }
379    
380    #[tokio::test]
381    async fn test_circuit_breaker_failure() {
382        let mut breaker = CircuitBreaker::new();
383        breaker.initialize().await.unwrap();
384        
385        let result = breaker.execute(|| {
386            Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
387        }).await;
388        
389        assert!(result.is_err());
390        assert!(matches!(result.unwrap_err(), BreakerError::OperationFailed));
391        
392        let status = breaker.get_status().await.unwrap();
393        assert_eq!(status.state, CircuitState::Closed);
394        assert_eq!(status.failure_count, 1);
395        assert_eq!(status.success_count, 0);
396    }
397    
398    #[tokio::test]
399    async fn test_circuit_breaker_opens_after_failures() {
400        let mut breaker = CircuitBreaker::new();
401        breaker.initialize().await.unwrap();
402        
403        // Record multiple failures to open the circuit
404        for _ in 0..5 {
405            let _ = breaker.execute(|| {
406                Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
407            }).await;
408        }
409        
410        let status = breaker.get_status().await.unwrap();
411        assert_eq!(status.state, CircuitState::Open);
412        assert_eq!(status.failure_count, 5);
413        
414        // Next operation should fail due to circuit being open
415        let result = breaker.execute(|| {
416            Box::pin(async { Ok::<i32, std::io::Error>(42) })
417        }).await;
418        
419        assert!(result.is_err());
420        assert!(matches!(result.unwrap_err(), BreakerError::CircuitOpen));
421    }
422    
423    #[tokio::test]
424    async fn test_circuit_breaker_reset() {
425        let mut breaker = CircuitBreaker::new();
426        breaker.initialize().await.unwrap();
427        
428        // Open the circuit
429        for _ in 0..5 {
430            let _ = breaker.execute(|| {
431                Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
432            }).await;
433        }
434        
435        let status = breaker.get_status().await.unwrap();
436        assert_eq!(status.state, CircuitState::Open);
437        
438        // Reset the circuit
439        breaker.reset().await.unwrap();
440        
441        let status = breaker.get_status().await.unwrap();
442        assert_eq!(status.state, CircuitState::Closed);
443        assert_eq!(status.failure_count, 0);
444        assert_eq!(status.success_count, 0);
445        
446        // Should be able to execute again
447        let can_execute = breaker.can_execute().await.unwrap();
448        assert!(can_execute);
449    }
450    
451    #[tokio::test]
452    async fn test_circuit_breaker_record_success() {
453        let mut breaker = CircuitBreaker::new();
454        breaker.initialize().await.unwrap();
455        
456        breaker.record_success().await.unwrap();
457        
458        let status = breaker.get_status().await.unwrap();
459        assert_eq!(status.success_count, 1);
460        assert_eq!(status.failure_count, 0);
461    }
462    
463    #[tokio::test]
464    async fn test_circuit_breaker_record_failure() {
465        let mut breaker = CircuitBreaker::new();
466        breaker.initialize().await.unwrap();
467        
468        breaker.record_failure().await.unwrap();
469        
470        let status = breaker.get_status().await.unwrap();
471        assert_eq!(status.failure_count, 1);
472        assert_eq!(status.success_count, 0);
473    }
474    
475    #[tokio::test]
476    async fn test_circuit_breaker_with_config() {
477        let config = BreakerConfig {
478            failure_threshold: 3,
479            success_threshold: 2,
480            timeout: Duration::from_secs(30),
481            enabled: true,
482        };
483        
484        let mut breaker = CircuitBreaker::with_config(config);
485        breaker.initialize().await.unwrap();
486        
487        // Should open after 3 failures
488        for _ in 0..3 {
489            breaker.record_failure().await.unwrap();
490        }
491        
492        let status = breaker.get_status().await.unwrap();
493        assert_eq!(status.state, CircuitState::Open);
494        assert_eq!(status.failure_count, 3);
495    }
496    
497    #[tokio::test]
498    async fn test_circuit_breaker_not_initialized() {
499        let breaker = CircuitBreaker::new();
500        
501        let result = breaker.can_execute().await;
502        assert!(result.is_err());
503        assert!(matches!(result.unwrap_err(), BreakerError::NotInitialized));
504        
505        let result = breaker.execute(|| {
506            Box::pin(async { Ok::<i32, std::io::Error>(42) })
507        }).await;
508        assert!(result.is_err());
509        assert!(matches!(result.unwrap_err(), BreakerError::NotInitialized));
510    }
511    
512    #[test]
513    fn test_breaker_config_default() {
514        let config = BreakerConfig::default();
515        assert_eq!(config.failure_threshold, 5);
516        assert_eq!(config.success_threshold, 3);
517        assert_eq!(config.timeout, Duration::from_secs(60));
518        assert!(config.enabled);
519    }
520    
521    #[test]
522    fn test_circuit_state() {
523        assert_eq!(CircuitState::Closed, CircuitState::Closed);
524        assert_eq!(CircuitState::Open, CircuitState::Open);
525        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
526    }
527    
528    #[test]
529    fn test_breaker_error_display() {
530        let error = BreakerError::CircuitOpen;
531        let error_string = format!("{}", error);
532        assert!(error_string.contains("Circuit breaker is open"));
533    }
534}