leptos_sync_core/reliability/
circuit_breaker.rs1use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone)]
16pub struct CircuitBreaker {
17 state: Arc<RwLock<CircuitBreakerState>>,
19 config: BreakerConfig,
21 initialized: bool,
23}
24
25impl CircuitBreaker {
26 pub fn new() -> Self {
28 Self {
29 state: Arc::new(RwLock::new(CircuitBreakerState::new())),
30 config: BreakerConfig::default(),
31 initialized: false,
32 }
33 }
34
35 pub fn with_config(config: BreakerConfig) -> Self {
37 Self {
38 state: Arc::new(RwLock::new(CircuitBreakerState::new())),
39 config,
40 initialized: false,
41 }
42 }
43
44 pub async fn initialize(&mut self) -> Result<(), BreakerError> {
46 let mut state = self.state.write().await;
47 state.reset();
48 self.initialized = true;
49 Ok(())
50 }
51
52 pub async fn shutdown(&mut self) -> Result<(), BreakerError> {
54 self.initialized = false;
55 Ok(())
56 }
57
58 pub fn is_initialized(&self) -> bool {
60 self.initialized
61 }
62
63 pub async fn can_execute(&self) -> Result<bool, BreakerError> {
65 if !self.initialized {
66 return Err(BreakerError::NotInitialized);
67 }
68
69 let state = self.state.read().await;
70 Ok(state.can_execute())
71 }
72
73 pub async fn execute<F, T, E>(&self, operation: F) -> Result<T, BreakerError>
75 where
76 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
77 E: std::error::Error + Send + Sync + 'static,
78 {
79 if !self.initialized {
80 return Err(BreakerError::NotInitialized);
81 }
82
83 if !self.can_execute().await? {
85 return Err(BreakerError::CircuitOpen);
86 }
87
88 let result = operation().await;
90
91 let mut state = self.state.write().await;
93 match result {
94 Ok(_) => {
95 state.record_success();
96 Ok(result.unwrap())
97 }
98 Err(_) => {
99 state.record_failure();
100 Err(BreakerError::OperationFailed)
101 }
102 }
103 }
104
105 pub async fn record_success(&self) -> Result<(), BreakerError> {
107 if !self.initialized {
108 return Err(BreakerError::NotInitialized);
109 }
110
111 let mut state = self.state.write().await;
112 state.record_success();
113 Ok(())
114 }
115
116 pub async fn record_failure(&self) -> Result<(), BreakerError> {
118 if !self.initialized {
119 return Err(BreakerError::NotInitialized);
120 }
121
122 let mut state = self.state.write().await;
123 state.record_failure();
124 Ok(())
125 }
126
127 pub async fn get_state(&self) -> CircuitState {
129 let state = self.state.read().await;
130 state.get_state()
131 }
132
133 pub async fn get_status(&self) -> Result<CircuitBreakerStatus, BreakerError> {
135 if !self.initialized {
136 return Err(BreakerError::NotInitialized);
137 }
138
139 let state = self.state.read().await;
140 Ok(state.get_status())
141 }
142
143 pub async fn reset(&self) -> Result<(), BreakerError> {
145 if !self.initialized {
146 return Err(BreakerError::NotInitialized);
147 }
148
149 let mut state = self.state.write().await;
150 state.reset();
151 Ok(())
152 }
153}
154
155#[derive(Debug, Clone)]
157struct CircuitBreakerState {
158 state: CircuitState,
160 failure_count: usize,
162 success_count: usize,
164 last_failure_time: Option<Instant>,
166 last_success_time: Option<Instant>,
168}
169
170impl CircuitBreakerState {
171 fn new() -> Self {
173 Self {
174 state: CircuitState::Closed,
175 failure_count: 0,
176 success_count: 0,
177 last_failure_time: None,
178 last_success_time: None,
179 }
180 }
181
182 fn can_execute(&self) -> bool {
184 match self.state {
185 CircuitState::Closed => true,
186 CircuitState::Open => {
187 if let Some(last_failure) = self.last_failure_time {
188 last_failure.elapsed() >= Duration::from_secs(60) } else {
190 false
191 }
192 }
193 CircuitState::HalfOpen => true,
194 }
195 }
196
197 fn record_success(&mut self) {
199 self.success_count += 1;
200 self.failure_count = 0;
201 self.last_success_time = Some(Instant::now());
202
203 if self.state == CircuitState::HalfOpen && self.success_count >= 3 {
204 self.state = CircuitState::Closed;
205 self.success_count = 0;
206 }
207 }
208
209 fn record_failure(&mut self) {
211 self.failure_count += 1;
212 self.success_count = 0;
213 self.last_failure_time = Some(Instant::now());
214
215 if self.failure_count >= 5 {
216 self.state = CircuitState::Open;
217 } else if self.state == CircuitState::HalfOpen {
218 self.state = CircuitState::Open;
219 }
220 }
221
222 fn get_state(&self) -> CircuitState {
224 self.state.clone()
225 }
226
227 fn get_status(&self) -> CircuitBreakerStatus {
229 CircuitBreakerStatus {
230 state: self.state.clone(),
231 failure_count: self.failure_count,
232 success_count: self.success_count,
233 last_failure_time: self.last_failure_time,
234 last_success_time: self.last_success_time,
235 }
236 }
237
238 fn reset(&mut self) {
240 self.state = CircuitState::Closed;
241 self.failure_count = 0;
242 self.success_count = 0;
243 self.last_failure_time = None;
244 self.last_success_time = None;
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
250pub enum CircuitState {
251 Closed,
253 Open,
255 HalfOpen,
257}
258
259#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
261pub struct CircuitBreakerStatus {
262 pub state: CircuitState,
264 pub failure_count: usize,
266 pub success_count: usize,
268 #[serde(skip_serializing, skip_deserializing)]
270 pub last_failure_time: Option<Instant>,
271 #[serde(skip_serializing, skip_deserializing)]
273 pub last_success_time: Option<Instant>,
274}
275
276#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
278pub struct BreakerConfig {
279 pub failure_threshold: usize,
281 pub success_threshold: usize,
283 pub timeout: Duration,
285 pub enabled: bool,
287}
288
289impl Default for BreakerConfig {
290 fn default() -> Self {
291 Self {
292 failure_threshold: 5,
293 success_threshold: 3,
294 timeout: Duration::from_secs(60),
295 enabled: true,
296 }
297 }
298}
299
300#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
302pub enum BreakerError {
303 NotInitialized,
305 CircuitOpen,
307 OperationFailed,
309 ConfigurationError(String),
311}
312
313impl std::fmt::Display for BreakerError {
314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 match self {
316 BreakerError::NotInitialized => write!(f, "Circuit breaker not initialized"),
317 BreakerError::CircuitOpen => write!(f, "Circuit breaker is open"),
318 BreakerError::OperationFailed => write!(f, "Operation failed"),
319 BreakerError::ConfigurationError(msg) => write!(f, "Configuration error: {}", msg),
320 }
321 }
322}
323
324impl std::error::Error for BreakerError {}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[tokio::test]
331 async fn test_circuit_breaker_creation() {
332 let breaker = CircuitBreaker::new();
333 assert!(!breaker.is_initialized());
334 }
335
336 #[tokio::test]
337 async fn test_circuit_breaker_initialization() {
338 let mut breaker = CircuitBreaker::new();
339 let result = breaker.initialize().await;
340 assert!(result.is_ok());
341 assert!(breaker.is_initialized());
342 }
343
344 #[tokio::test]
345 async fn test_circuit_breaker_shutdown() {
346 let mut breaker = CircuitBreaker::new();
347 breaker.initialize().await.unwrap();
348 let result = breaker.shutdown().await;
349 assert!(result.is_ok());
350 assert!(!breaker.is_initialized());
351 }
352
353 #[tokio::test]
354 async fn test_circuit_breaker_can_execute() {
355 let mut breaker = CircuitBreaker::new();
356 breaker.initialize().await.unwrap();
357
358 let can_execute = breaker.can_execute().await.unwrap();
359 assert!(can_execute);
360 }
361
362 #[tokio::test]
363 async fn test_circuit_breaker_success() {
364 let mut breaker = CircuitBreaker::new();
365 breaker.initialize().await.unwrap();
366
367 let result = breaker.execute(|| {
368 Box::pin(async { Ok::<i32, std::io::Error>(42) })
369 }).await;
370
371 assert!(result.is_ok());
372 assert_eq!(result.unwrap(), 42);
373
374 let status = breaker.get_status().await.unwrap();
375 assert_eq!(status.state, CircuitState::Closed);
376 assert_eq!(status.failure_count, 0);
377 assert_eq!(status.success_count, 1);
378 }
379
380 #[tokio::test]
381 async fn test_circuit_breaker_failure() {
382 let mut breaker = CircuitBreaker::new();
383 breaker.initialize().await.unwrap();
384
385 let result = breaker.execute(|| {
386 Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
387 }).await;
388
389 assert!(result.is_err());
390 assert!(matches!(result.unwrap_err(), BreakerError::OperationFailed));
391
392 let status = breaker.get_status().await.unwrap();
393 assert_eq!(status.state, CircuitState::Closed);
394 assert_eq!(status.failure_count, 1);
395 assert_eq!(status.success_count, 0);
396 }
397
398 #[tokio::test]
399 async fn test_circuit_breaker_opens_after_failures() {
400 let mut breaker = CircuitBreaker::new();
401 breaker.initialize().await.unwrap();
402
403 for _ in 0..5 {
405 let _ = breaker.execute(|| {
406 Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
407 }).await;
408 }
409
410 let status = breaker.get_status().await.unwrap();
411 assert_eq!(status.state, CircuitState::Open);
412 assert_eq!(status.failure_count, 5);
413
414 let result = breaker.execute(|| {
416 Box::pin(async { Ok::<i32, std::io::Error>(42) })
417 }).await;
418
419 assert!(result.is_err());
420 assert!(matches!(result.unwrap_err(), BreakerError::CircuitOpen));
421 }
422
423 #[tokio::test]
424 async fn test_circuit_breaker_reset() {
425 let mut breaker = CircuitBreaker::new();
426 breaker.initialize().await.unwrap();
427
428 for _ in 0..5 {
430 let _ = breaker.execute(|| {
431 Box::pin(async { Err::<i32, std::io::Error>(std::io::Error::new(std::io::ErrorKind::Other, "Operation failed")) })
432 }).await;
433 }
434
435 let status = breaker.get_status().await.unwrap();
436 assert_eq!(status.state, CircuitState::Open);
437
438 breaker.reset().await.unwrap();
440
441 let status = breaker.get_status().await.unwrap();
442 assert_eq!(status.state, CircuitState::Closed);
443 assert_eq!(status.failure_count, 0);
444 assert_eq!(status.success_count, 0);
445
446 let can_execute = breaker.can_execute().await.unwrap();
448 assert!(can_execute);
449 }
450
451 #[tokio::test]
452 async fn test_circuit_breaker_record_success() {
453 let mut breaker = CircuitBreaker::new();
454 breaker.initialize().await.unwrap();
455
456 breaker.record_success().await.unwrap();
457
458 let status = breaker.get_status().await.unwrap();
459 assert_eq!(status.success_count, 1);
460 assert_eq!(status.failure_count, 0);
461 }
462
463 #[tokio::test]
464 async fn test_circuit_breaker_record_failure() {
465 let mut breaker = CircuitBreaker::new();
466 breaker.initialize().await.unwrap();
467
468 breaker.record_failure().await.unwrap();
469
470 let status = breaker.get_status().await.unwrap();
471 assert_eq!(status.failure_count, 1);
472 assert_eq!(status.success_count, 0);
473 }
474
475 #[tokio::test]
476 async fn test_circuit_breaker_with_config() {
477 let config = BreakerConfig {
478 failure_threshold: 3,
479 success_threshold: 2,
480 timeout: Duration::from_secs(30),
481 enabled: true,
482 };
483
484 let mut breaker = CircuitBreaker::with_config(config);
485 breaker.initialize().await.unwrap();
486
487 for _ in 0..3 {
489 breaker.record_failure().await.unwrap();
490 }
491
492 let status = breaker.get_status().await.unwrap();
493 assert_eq!(status.state, CircuitState::Open);
494 assert_eq!(status.failure_count, 3);
495 }
496
497 #[tokio::test]
498 async fn test_circuit_breaker_not_initialized() {
499 let breaker = CircuitBreaker::new();
500
501 let result = breaker.can_execute().await;
502 assert!(result.is_err());
503 assert!(matches!(result.unwrap_err(), BreakerError::NotInitialized));
504
505 let result = breaker.execute(|| {
506 Box::pin(async { Ok::<i32, std::io::Error>(42) })
507 }).await;
508 assert!(result.is_err());
509 assert!(matches!(result.unwrap_err(), BreakerError::NotInitialized));
510 }
511
512 #[test]
513 fn test_breaker_config_default() {
514 let config = BreakerConfig::default();
515 assert_eq!(config.failure_threshold, 5);
516 assert_eq!(config.success_threshold, 3);
517 assert_eq!(config.timeout, Duration::from_secs(60));
518 assert!(config.enabled);
519 }
520
521 #[test]
522 fn test_circuit_state() {
523 assert_eq!(CircuitState::Closed, CircuitState::Closed);
524 assert_eq!(CircuitState::Open, CircuitState::Open);
525 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
526 }
527
528 #[test]
529 fn test_breaker_error_display() {
530 let error = BreakerError::CircuitOpen;
531 let error_string = format!("{}", error);
532 assert!(error_string.contains("Circuit breaker is open"));
533 }
534}