arcly_http/resilience/
mod.rs1pub use arcly_http_macros::circuit_breaker;
9
10pub mod bulkhead;
11pub mod distributed_rate_limit;
12pub mod dlock;
13pub mod rate_limit;
14pub mod timeout;
15pub use bulkhead::Bulkhead;
16pub use distributed_rate_limit::{
17 DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
18};
19pub use dlock::{DLockBackend, DistributedLock, LockGuard};
20pub use rate_limit::RateLimit;
21
22use std::error::Error as StdError;
23use std::fmt;
24use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
25use std::time::Duration;
26
27use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
28
29const CLOSED: u8 = 0;
30const OPEN: u8 = 1;
31const HALF_OPEN: u8 = 2;
32
33#[derive(Debug, Clone)]
40pub struct BreakerOpen;
41
42impl fmt::Display for BreakerOpen {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "circuit breaker open")
45 }
46}
47impl StdError for BreakerOpen {}
48impl HttpError for BreakerOpen {
49 fn problem(&self) -> ProblemDetails {
50 ServiceUnavailable::new("circuit breaker open").problem()
51 }
52}
53
54impl From<BreakerOpen> for ServiceUnavailable {
55 fn from(_: BreakerOpen) -> Self {
56 ServiceUnavailable::new("circuit breaker open")
57 }
58}
59
60pub struct CircuitBreaker {
61 state: AtomicU8,
62 failure_threshold: u32,
63 failure_count: AtomicU32,
64 cooldown_millis: u64,
65 last_state_change_nanos: AtomicU64,
70}
71
72impl CircuitBreaker {
73 pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
75 Self::const_new(failure_threshold, cooldown.as_millis() as u64)
76 }
77
78 pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
80 Self {
81 state: AtomicU8::new(CLOSED),
82 failure_threshold,
83 failure_count: AtomicU32::new(0),
84 cooldown_millis,
85 last_state_change_nanos: AtomicU64::new(0),
86 }
87 }
88
89 #[inline]
90 pub fn state(&self) -> u8 {
91 self.state.load(Ordering::Relaxed)
92 }
93
94 fn now_nanos() -> u64 {
95 use std::sync::OnceLock;
98 use std::time::Instant;
99 static EPOCH: OnceLock<Instant> = OnceLock::new();
100 let epoch = EPOCH.get_or_init(Instant::now);
101 epoch.elapsed().as_nanos() as u64
102 }
103
104 pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
108 where
109 F: FnMut() -> Fut,
110 Fut: std::future::Future<Output = Result<T, E>>,
111 {
112 let current = self.state.load(Ordering::Relaxed);
113
114 if current == OPEN {
115 let elapsed_ms = (Self::now_nanos()
116 .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
117 / 1_000_000;
118 if elapsed_ms > self.cooldown_millis {
119 let _ = self.state.compare_exchange(
120 OPEN,
121 HALF_OPEN,
122 Ordering::SeqCst,
123 Ordering::Relaxed,
124 );
125 } else {
126 return Err(BreakerOpen);
127 }
128 }
129
130 match action().await {
131 Ok(v) => {
132 if self.state.load(Ordering::SeqCst) == HALF_OPEN {
133 self.state.store(CLOSED, Ordering::SeqCst);
134 self.failure_count.store(0, Ordering::SeqCst);
135 }
136 Ok(Ok(v))
137 }
138 Err(e) => {
139 let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
140 if count >= self.failure_threshold
141 && self
142 .state
143 .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
144 .is_ok()
145 {
146 self.last_state_change_nanos
147 .store(Self::now_nanos(), Ordering::Relaxed);
148 }
149 Ok(Err(e))
150 }
151 }
152 }
153}