oxios_kernel/
circuit_breaker.rs1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12const STATE_CLOSED: u32 = 0;
14const STATE_OPEN: u32 = 1;
16const STATE_HALF_OPEN: u32 = 2;
18
19pub struct CircuitBreaker {
32 state: AtomicU32,
33 failure_count: AtomicU32,
34 last_failure_ts: AtomicU64,
35 half_open_probe_sent: AtomicBool,
37 threshold: u32,
38 timeout: Duration,
39}
40
41impl CircuitBreaker {
42 pub fn new(threshold: u32, timeout_secs: u64) -> Self {
47 Self {
48 state: AtomicU32::new(STATE_CLOSED),
49 failure_count: AtomicU32::new(0),
50 last_failure_ts: AtomicU64::new(0),
51 half_open_probe_sent: AtomicBool::new(false),
52 threshold,
53 timeout: Duration::from_secs(timeout_secs),
54 }
55 }
56
57 pub fn is_allowed(&self) -> bool {
62 let state = self.state.load(Ordering::Acquire);
63
64 match state {
65 STATE_CLOSED => true,
66 STATE_OPEN
67 if self.should_attempt_reset() => {
69 match self.state.compare_exchange(
71 STATE_OPEN,
72 STATE_HALF_OPEN,
73 Ordering::AcqRel,
74 Ordering::Acquire,
75 ) {
76 Ok(_) => {
77 self.half_open_probe_sent.store(true, Ordering::Release);
79 true
80 }
81 Err(_) => {
82 self.half_open_probe_sent
85 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
86 .is_ok()
87 }
88 }
89 }
90 STATE_HALF_OPEN => {
91 self.half_open_probe_sent
94 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
95 .is_ok()
96 }
97 _ => false,
98 }
99 }
100
101 fn should_attempt_reset(&self) -> bool {
102 let last_ts_ms = self.last_failure_ts.load(Ordering::Acquire);
103 if last_ts_ms == 0 {
104 return true;
105 }
106 let now = SystemTime::now()
107 .duration_since(SystemTime::UNIX_EPOCH)
108 .unwrap_or_default();
109 let last = Duration::from_millis(last_ts_ms);
110 let elapsed = now.saturating_sub(last);
111 elapsed >= self.timeout
112 }
113
114 pub fn record_success(&self) {
116 self.failure_count.store(0, Ordering::Release);
117 self.state.store(STATE_CLOSED, Ordering::Release);
118 self.half_open_probe_sent.store(false, Ordering::Release);
119 }
120
121 pub fn record_failure(&self) {
123 let failures = self.failure_count.fetch_add(1, Ordering::AcqRel) + 1;
124 let now = SystemTime::now()
125 .duration_since(SystemTime::UNIX_EPOCH)
126 .map(|d| d.as_millis() as u64)
127 .unwrap_or(0);
128 self.last_failure_ts.store(now, Ordering::Release);
129
130 if failures >= self.threshold {
131 self.state.store(STATE_OPEN, Ordering::Release);
132 self.half_open_probe_sent.store(false, Ordering::Release);
133 tracing::warn!(
134 failures,
135 threshold = self.threshold,
136 "Circuit breaker OPEN — too many failures"
137 );
138 }
139 }
140
141 pub fn state(&self) -> &'static str {
143 match self.state.load(Ordering::Acquire) {
144 STATE_CLOSED => "closed",
145 STATE_OPEN => "open",
146 STATE_HALF_OPEN => "half_open",
147 _ => "unknown",
148 }
149 }
150
151 #[allow(dead_code)]
153 pub fn failure_count(&self) -> u32 {
154 self.failure_count.load(Ordering::Acquire)
155 }
156}
157
158impl Default for CircuitBreaker {
159 fn default() -> Self {
160 Self::new(5, 30)
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn test_circuit_starts_closed() {
171 let cb = CircuitBreaker::default();
172 assert!(cb.is_allowed());
173 assert_eq!(cb.state(), "closed");
174 }
175
176 #[test]
177 fn test_circuit_opens_after_threshold_failures() {
178 let cb = CircuitBreaker::new(3, 60);
179 for _ in 0..2 {
180 cb.record_failure();
181 }
182 assert!(cb.is_allowed()); cb.record_failure(); assert!(!cb.is_allowed()); assert_eq!(cb.state(), "open");
187 }
188
189 #[test]
190 fn test_circuit_closes_on_success() {
191 let cb = CircuitBreaker::default();
192 cb.record_failure();
193 cb.record_failure();
194 cb.record_failure();
195 cb.record_failure();
196 cb.record_failure(); cb.record_success();
199 assert!(cb.is_allowed());
200 assert_eq!(cb.state(), "closed");
201 }
202
203 #[test]
204 fn test_half_open_allows_only_one_probe() {
205 let cb = CircuitBreaker::new(1, 1); cb.record_failure(); assert!(!cb.is_allowed()); std::thread::sleep(std::time::Duration::from_millis(1100));
211 assert!(cb.is_allowed()); assert!(!cb.is_allowed()); assert!(!cb.is_allowed()); assert_eq!(cb.state(), "half_open");
216 }
217
218 #[test]
219 fn test_half_open_opens_on_failure() {
220 let cb = CircuitBreaker::new(1, 1);
221 cb.record_failure(); std::thread::sleep(std::time::Duration::from_millis(1100));
223 assert!(cb.is_allowed()); cb.record_failure(); assert_eq!(cb.state(), "open");
227 assert!(!cb.is_allowed()); }
230
231 #[test]
232 fn test_half_open_closes_on_success() {
233 let cb = CircuitBreaker::new(1, 1);
234 cb.record_failure(); std::thread::sleep(std::time::Duration::from_millis(1100));
236 assert!(cb.is_allowed()); cb.record_success(); assert_eq!(cb.state(), "closed");
240 assert!(cb.is_allowed()); }
242}