1pub use arcly_http_macros::circuit_breaker;
9
10pub mod bulkhead;
11pub mod distributed_rate_limit;
12pub mod dlock;
13pub mod rate_limit;
14pub mod timeout;
15pub use bulkhead::Bulkhead;
16pub use distributed_rate_limit::{
17 DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
18};
19pub use dlock::{DLockBackend, DistributedLock, LockGuard};
20pub use rate_limit::RateLimit;
21
22use std::error::Error as StdError;
23use std::fmt;
24use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
25use std::time::Duration;
26
27use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
28
29const CLOSED: u8 = 0;
30const OPEN: u8 = 1;
31const HALF_OPEN: u8 = 2;
32
33#[derive(Debug, Clone)]
40pub struct BreakerOpen;
41
42impl fmt::Display for BreakerOpen {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "circuit breaker open")
45 }
46}
47impl StdError for BreakerOpen {}
48impl HttpError for BreakerOpen {
49 fn problem(&self) -> ProblemDetails {
50 ServiceUnavailable::new("circuit breaker open").problem()
51 }
52}
53
54impl From<BreakerOpen> for ServiceUnavailable {
55 fn from(_: BreakerOpen) -> Self {
56 ServiceUnavailable::new("circuit breaker open")
57 }
58}
59
60pub struct CircuitBreaker {
61 state: AtomicU8,
62 failure_threshold: u32,
63 failure_count: AtomicU32,
64 cooldown_millis: u64,
65 name: &'static str,
67 last_state_change_nanos: AtomicU64,
72}
73
74impl CircuitBreaker {
75 pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
77 Self::const_new(failure_threshold, cooldown.as_millis() as u64)
78 }
79
80 pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
82 Self::const_named("", failure_threshold, cooldown_millis)
83 }
84
85 pub const fn const_named(
88 name: &'static str,
89 failure_threshold: u32,
90 cooldown_millis: u64,
91 ) -> Self {
92 Self {
93 state: AtomicU8::new(CLOSED),
94 failure_threshold,
95 failure_count: AtomicU32::new(0),
96 cooldown_millis,
97 name,
98 last_state_change_nanos: AtomicU64::new(0),
99 }
100 }
101
102 fn note_transition(&self, to: &'static str) {
103 metrics::counter!(
104 "circuit_breaker_transitions_total",
105 "name" => self.name, "to" => to
106 )
107 .increment(1);
108 tracing::warn!(breaker = self.name, to, "circuit breaker state change");
109 }
110
111 #[inline]
112 pub fn state(&self) -> u8 {
113 self.state.load(Ordering::Relaxed)
114 }
115
116 fn now_nanos() -> u64 {
117 use std::sync::OnceLock;
120 use std::time::Instant;
121 static EPOCH: OnceLock<Instant> = OnceLock::new();
122 let epoch = EPOCH.get_or_init(Instant::now);
123 epoch.elapsed().as_nanos() as u64
124 }
125
126 pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
140 where
141 F: FnMut() -> Fut,
142 Fut: std::future::Future<Output = Result<T, E>>,
143 {
144 let mut probing = false;
146
147 match self.state.load(Ordering::SeqCst) {
148 OPEN => {
149 let elapsed_ms = (Self::now_nanos()
150 .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
151 / 1_000_000;
152 if elapsed_ms <= self.cooldown_millis {
153 return Err(BreakerOpen);
154 }
155 if self
158 .state
159 .compare_exchange(OPEN, HALF_OPEN, Ordering::SeqCst, Ordering::SeqCst)
160 .is_err()
161 {
162 return Err(BreakerOpen);
163 }
164 self.note_transition("half_open");
165 probing = true;
166 }
167 HALF_OPEN => return Err(BreakerOpen),
171 _ => {} }
173
174 match action().await {
175 Ok(v) => {
176 if probing {
177 self.state.store(CLOSED, Ordering::SeqCst);
178 self.failure_count.store(0, Ordering::SeqCst);
179 self.note_transition("closed");
180 }
181 Ok(Ok(v))
182 }
183 Err(e) => {
184 if probing {
185 self.last_state_change_nanos
189 .store(Self::now_nanos(), Ordering::Relaxed);
190 self.state.store(OPEN, Ordering::SeqCst);
191 self.note_transition("open");
192 return Ok(Err(e));
193 }
194 let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
195 if count >= self.failure_threshold
196 && self
197 .state
198 .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
199 .is_ok()
200 {
201 self.last_state_change_nanos
202 .store(Self::now_nanos(), Ordering::Relaxed);
203 self.note_transition("open");
204 }
205 Ok(Err(e))
206 }
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 fn fail() -> Result<(), &'static str> {
216 Err("boom")
217 }
218 fn ok() -> Result<(), &'static str> {
219 Ok(())
220 }
221
222 async fn drive(
223 b: &CircuitBreaker,
224 outcome: fn() -> Result<(), &'static str>,
225 ) -> Result<Result<(), &'static str>, BreakerOpen> {
226 b.execute(|| async move { outcome() }).await
227 }
228
229 #[tokio::test]
230 async fn trips_open_after_threshold_and_rejects() {
231 let b = CircuitBreaker::const_named("t", 3, 60_000);
232 for _ in 0..3 {
233 assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
234 }
235 assert_eq!(b.state(), OPEN);
236 assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
238 }
239
240 #[tokio::test]
241 async fn successful_probe_closes_the_breaker() {
242 let b = CircuitBreaker::const_named("t", 1, 10);
243 assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
244 assert_eq!(b.state(), OPEN);
245
246 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
247 assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
249 assert_eq!(b.state(), CLOSED);
250 assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
252 }
253
254 #[tokio::test]
255 async fn failed_probe_reopens_with_fresh_cooldown() {
256 let b = CircuitBreaker::const_named("t", 1, 20);
257 assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
258 assert_eq!(b.state(), OPEN);
259
260 tokio::time::sleep(std::time::Duration::from_millis(40)).await;
261 assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
264 assert_eq!(b.state(), OPEN, "failed probe must re-open the breaker");
265 assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
267
268 tokio::time::sleep(std::time::Duration::from_millis(40)).await;
270 assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
271 assert_eq!(b.state(), CLOSED);
272 }
273
274 #[tokio::test]
275 async fn half_open_admits_exactly_one_probe() {
276 let b: &'static CircuitBreaker =
277 Box::leak(Box::new(CircuitBreaker::const_named("t", 1, 10)));
278 assert!(matches!(drive(b, fail).await, Ok(Err(_))));
279 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
280
281 let gate = std::sync::Arc::new(tokio::sync::Notify::new());
284 let release = gate.clone();
285 let probe = tokio::spawn(async move {
286 b.execute(|| {
287 let release = release.clone();
288 async move {
289 release.notify_one(); tokio::time::sleep(std::time::Duration::from_millis(50)).await;
291 Ok::<(), &'static str>(())
292 }
293 })
294 .await
295 });
296
297 gate.notified().await;
298 assert_eq!(b.state(), HALF_OPEN);
299 assert!(
300 matches!(drive(b, ok).await, Err(BreakerOpen)),
301 "second caller during the probe must be rejected"
302 );
303
304 assert!(matches!(probe.await.expect("join"), Ok(Ok(()))));
305 assert_eq!(b.state(), CLOSED);
306 }
307}