opendev_http/
circuit_breaker.rs1use std::sync::Mutex;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::{Deserialize, Serialize};
17use tracing::{debug, info, warn};
18
19use crate::models::HttpError;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum CircuitState {
24 Closed,
26 Open,
28 HalfOpen,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct CircuitBreakerConfig {
38 pub failure_threshold: u32,
40 pub reset_timeout_secs: u64,
42 pub probe_interval_secs: u64,
45}
46
47impl Default for CircuitBreakerConfig {
48 fn default() -> Self {
49 Self {
50 failure_threshold: 5,
51 reset_timeout_secs: 30,
52 probe_interval_secs: 30,
53 }
54 }
55}
56
57pub struct CircuitBreaker {
60 failure_count: AtomicU32,
62 threshold: u32,
64 last_failure: Mutex<Option<Instant>>,
66 cooldown: Duration,
68 provider: String,
70}
71
72impl CircuitBreaker {
73 pub fn new(provider: impl Into<String>, threshold: u32, cooldown: Duration) -> Self {
79 Self {
80 failure_count: AtomicU32::new(0),
81 threshold,
82 last_failure: Mutex::new(None),
83 cooldown,
84 provider: provider.into(),
85 }
86 }
87
88 pub fn with_defaults(provider: impl Into<String>) -> Self {
90 Self::new(provider, 5, Duration::from_secs(30))
91 }
92
93 pub fn from_config(provider: impl Into<String>, config: &CircuitBreakerConfig) -> Self {
95 Self::new(
96 provider,
97 config.failure_threshold,
98 Duration::from_secs(config.reset_timeout_secs),
99 )
100 }
101
102 pub fn state(&self) -> CircuitState {
104 let failures = self.failure_count.load(Ordering::Relaxed);
105
106 if failures < self.threshold {
107 return CircuitState::Closed;
108 }
109
110 let lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
112 match *lock {
113 Some(last) if last.elapsed() >= self.cooldown => CircuitState::HalfOpen,
114 _ => CircuitState::Open,
115 }
116 }
117
118 pub fn check(&self) -> Result<(), HttpError> {
123 match self.state() {
124 CircuitState::Closed => Ok(()),
125 CircuitState::HalfOpen => {
126 debug!(
127 provider = %self.provider,
128 "Circuit half-open, allowing probe request"
129 );
130 Ok(())
131 }
132 CircuitState::Open => {
133 let remaining = {
134 let lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
135 lock.map(|last| self.cooldown.saturating_sub(last.elapsed()))
136 .unwrap_or(self.cooldown)
137 };
138 warn!(
139 provider = %self.provider,
140 remaining_secs = remaining.as_secs(),
141 "Circuit open, rejecting request"
142 );
143 Err(HttpError::Other(format!(
144 "Circuit breaker open for provider '{}'. \
145 Too many consecutive failures ({}). \
146 Will retry in {}s.",
147 self.provider,
148 self.failure_count.load(Ordering::Relaxed),
149 remaining.as_secs(),
150 )))
151 }
152 }
153 }
154
155 pub fn record_success(&self) {
158 let prev = self.failure_count.swap(0, Ordering::Relaxed);
159 if prev >= self.threshold {
160 info!(
161 provider = %self.provider,
162 "Circuit breaker closed after successful probe"
163 );
164 }
165 }
166
167 pub fn record_failure(&self) {
170 let new_count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
171
172 {
174 let mut lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
175 *lock = Some(Instant::now());
176 }
177
178 if new_count == self.threshold {
179 warn!(
180 provider = %self.provider,
181 threshold = self.threshold,
182 cooldown_secs = self.cooldown.as_secs(),
183 "Circuit breaker opened after {} consecutive failures",
184 self.threshold
185 );
186 }
187 }
188
189 pub fn failure_count(&self) -> u32 {
191 self.failure_count.load(Ordering::Relaxed)
192 }
193
194 pub fn reset(&self) {
196 self.failure_count.store(0, Ordering::Relaxed);
197 let mut lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
198 *lock = None;
199 }
200}
201
202impl std::fmt::Debug for CircuitBreaker {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("CircuitBreaker")
205 .field("provider", &self.provider)
206 .field("state", &self.state())
207 .field("failure_count", &self.failure_count.load(Ordering::Relaxed))
208 .field("threshold", &self.threshold)
209 .field("cooldown", &self.cooldown)
210 .finish()
211 }
212}
213
214#[cfg(test)]
215#[path = "circuit_breaker_tests.rs"]
216mod tests;