oxios_kernel/
circuit_breaker.rs1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12const STATE_CLOSED: u32 = 0;
14const STATE_OPEN: u32 = 1;
16const STATE_HALF_OPEN: u32 = 2;
18
19pub struct CircuitBreaker {
32 state: AtomicU32,
33 failure_count: AtomicU32,
34 last_failure_ts: AtomicU64,
35 half_open_probe_sent: AtomicBool,
37 threshold: u32,
38 timeout: Duration,
39}
40
41impl CircuitBreaker {
42 pub fn new(threshold: u32, timeout_secs: u64) -> Self {
47 Self {
48 state: AtomicU32::new(STATE_CLOSED),
49 failure_count: AtomicU32::new(0),
50 last_failure_ts: AtomicU64::new(0),
51 half_open_probe_sent: AtomicBool::new(false),
52 threshold,
53 timeout: Duration::from_secs(timeout_secs),
54 }
55 }
56
57 pub fn is_allowed(&self) -> bool {
62 let state = self.state.load(Ordering::Acquire);
63
64 match state {
65 STATE_CLOSED => true,
66 STATE_OPEN
67 if self.should_attempt_reset() => {
69 match self.state.compare_exchange(
71 STATE_OPEN,
72 STATE_HALF_OPEN,
73 Ordering::AcqRel,
74 Ordering::Acquire,
75 ) {
76 Ok(_) => {
77 self.half_open_probe_sent.store(true, Ordering::Release);
79 true
80 }
81 Err(_) => {
82 self.half_open_probe_sent
85 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
86 .is_ok()
87 }
88 }
89 }
90 STATE_HALF_OPEN => {
91 self.half_open_probe_sent
94 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
95 .is_ok()
96 }
97 _ => false,
98 }
99 }
100
101 fn should_attempt_reset(&self) -> bool {
102 let last_ts_ms = self.last_failure_ts.load(Ordering::Acquire);
103 if last_ts_ms == 0 {
104 return true;
105 }
106 let now = SystemTime::now()
107 .duration_since(SystemTime::UNIX_EPOCH)
108 .unwrap_or_default();
109 let last = Duration::from_millis(last_ts_ms);
110 let elapsed = now.saturating_sub(last);
111 elapsed >= self.timeout
112 }
113
114 pub fn record_success(&self) {
116 self.failure_count.store(0, Ordering::Release);
117 self.state.store(STATE_CLOSED, Ordering::Release);
118 self.half_open_probe_sent.store(false, Ordering::Release);
119 crate::metrics::get_metrics()
120 .llm_circuit_breaker_state
121 .set(0.0);
122 }
123
124 pub fn record_failure(&self) {
126 let failures = self.failure_count.fetch_add(1, Ordering::AcqRel) + 1;
127 let now = SystemTime::now()
128 .duration_since(SystemTime::UNIX_EPOCH)
129 .map(|d| d.as_millis() as u64)
130 .unwrap_or(0);
131 self.last_failure_ts.store(now, Ordering::Release);
132
133 if failures >= self.threshold {
134 self.state.store(STATE_OPEN, Ordering::Release);
135 self.half_open_probe_sent.store(false, Ordering::Release);
136 tracing::warn!(
137 failures,
138 threshold = self.threshold,
139 "Circuit breaker OPEN — too many failures"
140 );
141 crate::metrics::get_metrics()
142 .llm_circuit_breaker_state
143 .set(1.0);
144 }
145 }
146
147 pub fn state(&self) -> &'static str {
149 match self.state.load(Ordering::Acquire) {
150 STATE_CLOSED => "closed",
151 STATE_OPEN => "open",
152 STATE_HALF_OPEN => "half_open",
153 _ => "unknown",
154 }
155 }
156
157 #[allow(dead_code)]
159 pub fn failure_count(&self) -> u32 {
160 self.failure_count.load(Ordering::Acquire)
161 }
162}
163
164impl Default for CircuitBreaker {
165 fn default() -> Self {
166 Self::new(5, 30)
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_circuit_starts_closed() {
177 let cb = CircuitBreaker::default();
178 assert!(cb.is_allowed());
179 assert_eq!(cb.state(), "closed");
180 }
181
182 #[test]
183 fn test_circuit_opens_after_threshold_failures() {
184 let cb = CircuitBreaker::new(3, 60);
185 for _ in 0..2 {
186 cb.record_failure();
187 }
188 assert!(cb.is_allowed()); cb.record_failure(); assert!(!cb.is_allowed()); assert_eq!(cb.state(), "open");
193 }
194
195 #[test]
196 fn test_circuit_closes_on_success() {
197 let cb = CircuitBreaker::default();
198 cb.record_failure();
199 cb.record_failure();
200 cb.record_failure();
201 cb.record_failure();
202 cb.record_failure(); cb.record_success();
205 assert!(cb.is_allowed());
206 assert_eq!(cb.state(), "closed");
207 }
208
209 #[test]
210 fn test_half_open_allows_only_one_probe() {
211 let cb = CircuitBreaker::new(1, 1); cb.record_failure(); assert!(!cb.is_allowed()); std::thread::sleep(std::time::Duration::from_millis(1100));
217 assert!(cb.is_allowed()); assert!(!cb.is_allowed()); assert!(!cb.is_allowed()); assert_eq!(cb.state(), "half_open");
222 }
223
224 #[test]
225 fn test_half_open_opens_on_failure() {
226 let cb = CircuitBreaker::new(1, 1);
227 cb.record_failure(); std::thread::sleep(std::time::Duration::from_millis(1100));
229 assert!(cb.is_allowed()); cb.record_failure(); assert_eq!(cb.state(), "open");
233 assert!(!cb.is_allowed()); }
236
237 #[test]
238 fn test_half_open_closes_on_success() {
239 let cb = CircuitBreaker::new(1, 1);
240 cb.record_failure(); std::thread::sleep(std::time::Duration::from_millis(1100));
242 assert!(cb.is_allowed()); cb.record_success(); assert_eq!(cb.state(), "closed");
246 assert!(cb.is_allowed()); }
248}