scirs2_core/error/
circuitbreaker.rs1use crate::error::{CoreResult as Result, ErrorContext};
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum CircuitState {
11 Closed,
13 Open,
15 HalfOpen,
17}
18
19#[derive(Debug, Clone)]
21pub struct CircuitBreakerStatus {
22 pub state: CircuitState,
23 pub failure_count: usize,
24 pub success_count: usize,
25 pub last_state_change: Instant,
26}
27
28#[derive(Debug, Clone)]
30pub enum FallbackStrategy {
31 Default,
33 Cache,
35 Alternative,
37 FailFast,
39}
40
41#[derive(Debug, Clone)]
43pub struct RetryPolicy {
44 pub max_retries: usize,
45 pub initial_delay: Duration,
46 pub max_delay: Duration,
47 pub exponential_base: f64,
48}
49
50impl Default for RetryPolicy {
51 fn default() -> Self {
52 Self {
53 max_retries: 3,
54 initial_delay: Duration::from_millis(100),
55 max_delay: Duration::from_secs(10),
56 exponential_base: 2.0,
57 }
58 }
59}
60
61pub struct CircuitBreaker {
63 state: Arc<Mutex<CircuitState>>,
65 failure_count: AtomicUsize,
67 success_count: AtomicUsize,
69 last_state_change: Arc<Mutex<Instant>>,
71 config: CircuitBreakerConfig,
73}
74
75#[derive(Debug, Clone)]
77pub struct CircuitBreakerConfig {
78 pub failure_threshold: usize,
80 pub success_threshold: usize,
82 pub timeout: Duration,
84}
85
86impl Default for CircuitBreakerConfig {
87 fn default() -> Self {
88 Self {
89 failure_threshold: 5,
90 success_threshold: 3,
91 timeout: Duration::from_secs(30),
92 }
93 }
94}
95
96impl CircuitBreaker {
97 pub fn new(config: CircuitBreakerConfig) -> Self {
99 Self {
100 state: Arc::new(Mutex::new(CircuitState::Closed)),
101 failure_count: AtomicUsize::new(0),
102 success_count: AtomicUsize::new(0),
103 last_state_change: Arc::new(Mutex::new(Instant::now())),
104 config,
105 }
106 }
107
108 pub fn state(&self) -> CircuitState {
110 *self.state.lock().expect("Operation failed")
111 }
112
113 pub fn record_success(&self) {
115 let mut state = self.state.lock().expect("Operation failed");
116 match *state {
117 CircuitState::HalfOpen => {
118 let count = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
119 if count >= self.config.success_threshold {
120 *state = CircuitState::Closed;
121 self.failure_count.store(0, Ordering::SeqCst);
122 self.success_count.store(0, Ordering::SeqCst);
123 *self.last_state_change.lock().expect("Operation failed") = Instant::now();
124 }
125 }
126 CircuitState::Closed => {
127 self.failure_count.store(0, Ordering::SeqCst);
128 }
129 CircuitState::Open => {}
130 }
131 }
132
133 pub fn record_failure(&self) {
135 let mut state = self.state.lock().expect("Operation failed");
136 match *state {
137 CircuitState::Closed => {
138 let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
139 if count >= self.config.failure_threshold {
140 *state = CircuitState::Open;
141 *self.last_state_change.lock().expect("Operation failed") = Instant::now();
142 }
143 }
144 CircuitState::HalfOpen => {
145 *state = CircuitState::Open;
146 self.failure_count.store(0, Ordering::SeqCst);
147 self.success_count.store(0, Ordering::SeqCst);
148 *self.last_state_change.lock().expect("Operation failed") = Instant::now();
149 }
150 CircuitState::Open => {}
151 }
152 }
153
154 pub fn check_state(&self) {
156 let mut state = self.state.lock().expect("Operation failed");
157 if *state == CircuitState::Open {
158 let elapsed = self
159 .last_state_change
160 .lock()
161 .expect("Operation failed")
162 .elapsed();
163 if elapsed >= self.config.timeout {
164 *state = CircuitState::HalfOpen;
165 self.success_count.store(0, Ordering::SeqCst);
166 *self.last_state_change.lock().expect("Operation failed") = Instant::now();
167 }
168 }
169 }
170
171 pub fn is_allowed(&self) -> bool {
173 self.check_state();
174 let state = self.state();
175 state == CircuitState::Closed || state == CircuitState::HalfOpen
176 }
177
178 pub fn status(&self) -> CircuitBreakerStatus {
180 CircuitBreakerStatus {
181 state: self.state(),
182 failure_count: self.failure_count.load(Ordering::SeqCst),
183 success_count: self.success_count.load(Ordering::SeqCst),
184 last_state_change: *self.last_state_change.lock().expect("Operation failed"),
185 }
186 }
187
188 pub fn execute<F, T>(&self, f: F) -> Result<T>
190 where
191 F: FnOnce() -> Result<T>,
192 {
193 use crate::error::CoreError;
194
195 if !self.is_allowed() {
197 return Err(CoreError::ValueError(ErrorContext::new(
198 "Circuit breaker is open",
199 )));
200 }
201
202 match f() {
204 Ok(result) => {
205 self.record_success();
206 Ok(result)
207 }
208 Err(e) => {
209 self.record_failure();
210 Err(e)
211 }
212 }
213 }
214}
215
216pub struct RetryExecutor {
218 policy: RetryPolicy,
219}
220
221impl RetryExecutor {
222 pub fn new(policy: RetryPolicy) -> Self {
224 Self { policy }
225 }
226
227 pub fn execute<F, T>(&self, mut f: F) -> Result<T>
229 where
230 F: FnMut() -> Result<T>,
231 {
232 let mut last_error = None;
233 for _ in 0..self.policy.max_retries {
234 match f() {
235 Ok(result) => return Ok(result),
236 Err(e) => last_error = Some(e),
237 }
238 }
239 Err(last_error.expect("Operation failed"))
240 }
241}
242
243pub struct ResilientExecutor {
245 circuit_breaker: CircuitBreaker,
246 retry_executor: RetryExecutor,
247 fallback_strategy: FallbackStrategy,
248}
249
250impl ResilientExecutor {
251 pub fn new(
253 circuit_breaker: CircuitBreaker,
254 retry_executor: RetryExecutor,
255 fallback_strategy: FallbackStrategy,
256 ) -> Self {
257 Self {
258 circuit_breaker,
259 retry_executor,
260 fallback_strategy,
261 }
262 }
263
264 pub fn execute<F, T>(&self, f: F) -> Result<T>
266 where
267 F: FnMut() -> Result<T>,
268 {
269 if !self.circuit_breaker.is_allowed() {
270 return Err(crate::error::CoreError::ValueError(ErrorContext::new(
271 "Circuit breaker is open",
272 )));
273 }
274
275 match self.retry_executor.execute(f) {
276 Ok(result) => {
277 self.circuit_breaker.record_success();
278 Ok(result)
279 }
280 Err(e) => {
281 self.circuit_breaker.record_failure();
282 Err(e)
283 }
284 }
285 }
286}
287
288use once_cell::sync::Lazy;
290use std::collections::HashMap;
291use std::sync::RwLock;
292
293static CIRCUIT_BREAKERS: Lazy<RwLock<HashMap<String, Arc<CircuitBreaker>>>> =
294 Lazy::new(|| RwLock::new(HashMap::new()));
295
296pub fn get_circuitbreaker(name: &str) -> Option<Arc<CircuitBreaker>> {
298 CIRCUIT_BREAKERS
299 .read()
300 .expect("Operation failed")
301 .get(name)
302 .cloned()
303}
304
305pub fn list_circuitbreakers() -> Vec<String> {
307 CIRCUIT_BREAKERS
308 .read()
309 .expect("Operation failed")
310 .keys()
311 .cloned()
312 .collect()
313}