1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6
7#[derive(Debug, Clone)]
9enum CircuitState {
10 Closed {
11 failures: u32,
12 },
13 Open {
14 opened_at: DateTime<Utc>,
15 failures: u32,
16 },
17 HalfOpen {
18 failures: u32,
19 },
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum CanDeliver {
25 Yes,
26 No,
27 Probe,
28}
29
30#[derive(Debug, Clone, Serialize)]
32pub struct CircuitStatus {
33 pub agent_id: String,
34 pub state: String,
35 pub failures: u32,
36 pub opened_at: Option<String>,
37}
38
39#[derive(Debug, Clone)]
41pub struct CircuitConfig {
42 pub failure_threshold: u32,
43 pub cooldown_seconds: u64,
44}
45
46impl Default for CircuitConfig {
47 fn default() -> Self {
48 Self {
49 failure_threshold: 5,
50 cooldown_seconds: 60,
51 }
52 }
53}
54
55pub struct CircuitBreaker {
60 states: Arc<Mutex<HashMap<String, CircuitState>>>,
61 config: CircuitConfig,
62}
63
64impl CircuitBreaker {
65 pub fn new(config: CircuitConfig) -> Self {
66 Self {
67 states: Arc::new(Mutex::new(HashMap::new())),
68 config,
69 }
70 }
71
72 pub fn with_defaults() -> Self {
73 Self::new(CircuitConfig::default())
74 }
75
76 pub fn check(&self, agent_id: &str) -> CanDeliver {
78 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
79 let state = states
80 .entry(agent_id.to_string())
81 .or_insert_with(|| CircuitState::Closed { failures: 0 });
82
83 match state {
84 CircuitState::Closed { .. } => CanDeliver::Yes,
85 CircuitState::Open { opened_at, .. } => {
86 let elapsed = (Utc::now() - *opened_at).num_seconds();
87 if elapsed >= self.config.cooldown_seconds as i64 {
88 let failures = match state {
89 CircuitState::Open { failures, .. } => *failures,
90 _ => 0,
91 };
92 *state = CircuitState::HalfOpen { failures };
93 CanDeliver::Probe
94 } else {
95 CanDeliver::No
96 }
97 }
98 CircuitState::HalfOpen { .. } => CanDeliver::Probe,
99 }
100 }
101
102 pub fn record_failure(&self, agent_id: &str) {
104 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
105 let state = states
106 .entry(agent_id.to_string())
107 .or_insert_with(|| CircuitState::Closed { failures: 0 });
108
109 match state {
110 CircuitState::Closed { failures } => {
111 let new_failures = *failures + 1;
112 if new_failures >= self.config.failure_threshold {
113 *state = CircuitState::Open {
114 opened_at: Utc::now(),
115 failures: new_failures,
116 };
117 } else {
118 *state = CircuitState::Closed {
119 failures: new_failures,
120 };
121 }
122 }
123 CircuitState::Open { failures, .. } => {
124 *state = CircuitState::Open {
125 opened_at: Utc::now(),
126 failures: *failures + 1,
127 };
128 }
129 CircuitState::HalfOpen { failures } => {
130 *state = CircuitState::Open {
131 opened_at: Utc::now(),
132 failures: *failures + 1,
133 };
134 }
135 }
136 }
137
138 pub fn record_success(&self, agent_id: &str) {
140 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
141 if let Some(state) = states.get_mut(agent_id) {
142 if matches!(state, CircuitState::HalfOpen { .. }) {
143 *state = CircuitState::Closed { failures: 0 };
144 }
145 }
146 }
147
148 pub fn reset(&self, agent_id: &str) {
150 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
151 states.insert(agent_id.to_string(), CircuitState::Closed { failures: 0 });
152 }
153
154 pub fn remove(&self, agent_id: &str) {
156 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
157 states.remove(agent_id);
158 }
159
160 pub fn evict_stale(&self) -> usize {
163 let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
164 let cutoff = Utc::now() - chrono::Duration::hours(1);
165 let before = states.len();
166 states.retain(|_, state| match state {
167 CircuitState::Open { opened_at, .. } => *opened_at > cutoff,
168 _ => true,
169 });
170 before - states.len()
171 }
172
173 pub fn get_state(&self, agent_id: &str) -> CircuitStatus {
175 let states = self.states.lock().unwrap_or_else(|e| e.into_inner());
176 match states.get(agent_id) {
177 None => CircuitStatus {
178 agent_id: agent_id.to_string(),
179 state: "closed".to_string(),
180 failures: 0,
181 opened_at: None,
182 },
183 Some(CircuitState::Closed { failures }) => CircuitStatus {
184 agent_id: agent_id.to_string(),
185 state: "closed".to_string(),
186 failures: *failures,
187 opened_at: None,
188 },
189 Some(CircuitState::Open {
190 opened_at,
191 failures,
192 }) => CircuitStatus {
193 agent_id: agent_id.to_string(),
194 state: "open".to_string(),
195 failures: *failures,
196 opened_at: Some(opened_at.to_rfc3339()),
197 },
198 Some(CircuitState::HalfOpen { failures }) => CircuitStatus {
199 agent_id: agent_id.to_string(),
200 state: "half_open".to_string(),
201 failures: *failures,
202 opened_at: None,
203 },
204 }
205 }
206
207 pub fn list_active(&self) -> Vec<CircuitStatus> {
209 let states = self.states.lock().unwrap_or_else(|e| e.into_inner());
210 let mut result = Vec::new();
211 for (agent_id, state) in states.iter() {
212 match state {
213 CircuitState::Closed { failures: 0 } => continue,
214 CircuitState::Closed { failures } => result.push(CircuitStatus {
215 agent_id: agent_id.clone(),
216 state: "closed".to_string(),
217 failures: *failures,
218 opened_at: None,
219 }),
220 CircuitState::Open {
221 opened_at,
222 failures,
223 } => result.push(CircuitStatus {
224 agent_id: agent_id.clone(),
225 state: "open".to_string(),
226 failures: *failures,
227 opened_at: Some(opened_at.to_rfc3339()),
228 }),
229 CircuitState::HalfOpen { failures } => result.push(CircuitStatus {
230 agent_id: agent_id.clone(),
231 state: "half_open".to_string(),
232 failures: *failures,
233 opened_at: None,
234 }),
235 }
236 }
237 result
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 fn test_breaker() -> CircuitBreaker {
246 CircuitBreaker::new(CircuitConfig {
247 failure_threshold: 3,
248 cooldown_seconds: 60,
249 })
250 }
251
252 fn breaker_with_cooldown() -> CircuitBreaker {
253 CircuitBreaker::new(CircuitConfig {
254 failure_threshold: 3,
255 cooldown_seconds: 60,
256 })
257 }
258
259 #[test]
260 fn starts_closed() {
261 let cb = test_breaker();
262 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
263 }
264
265 #[test]
266 fn stays_closed_below_threshold() {
267 let cb = test_breaker();
268 cb.record_failure("agent1");
269 cb.record_failure("agent1");
270 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
271 }
272
273 #[test]
274 fn opens_at_threshold() {
275 let cb = test_breaker();
276 cb.record_failure("agent1");
277 cb.record_failure("agent1");
278 cb.record_failure("agent1");
279 assert_eq!(cb.check("agent1"), CanDeliver::No);
280 let status = cb.get_state("agent1");
281 assert_eq!(status.state, "open");
282 }
283
284 #[test]
285 fn half_open_after_cooldown() {
286 let cb = breaker_with_cooldown();
287 cb.record_failure("agent1");
288 cb.record_failure("agent1");
289 cb.record_failure("agent1");
290 assert_eq!(cb.check("agent1"), CanDeliver::No);
291
292 {
294 let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
295 states.insert(
296 "agent1".to_string(),
297 CircuitState::Open {
298 opened_at: Utc::now() - chrono::Duration::seconds(120),
299 failures: 3,
300 },
301 );
302 }
303 assert_eq!(cb.check("agent1"), CanDeliver::Probe);
304 }
305
306 #[test]
307 fn half_open_success_closes() {
308 let cb = test_breaker();
309 {
311 let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
312 states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
313 }
314 cb.record_success("agent1");
315 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
316 let status = cb.get_state("agent1");
317 assert_eq!(status.failures, 0);
318 }
319
320 #[test]
321 fn half_open_failure_reopens() {
322 let cb = breaker_with_cooldown();
323 {
324 let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
325 states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
326 }
327 cb.record_failure("agent1");
328 assert_eq!(cb.check("agent1"), CanDeliver::No);
329 }
330
331 #[test]
332 fn heartbeat_resets_closed() {
333 let cb = test_breaker();
334 cb.record_failure("agent1");
335 cb.record_failure("agent1");
336 cb.record_failure("agent1");
337 assert_eq!(cb.check("agent1"), CanDeliver::No);
338
339 cb.reset("agent1");
340 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
341 assert_eq!(cb.get_state("agent1").failures, 0);
342 }
343
344 #[test]
345 fn heartbeat_resets_open() {
346 let cb = breaker_with_cooldown();
347 cb.record_failure("agent1");
348 cb.record_failure("agent1");
349 cb.record_failure("agent1");
350 assert_eq!(cb.check("agent1"), CanDeliver::No);
351
352 cb.reset("agent1");
353 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
354 }
355
356 #[test]
357 fn heartbeat_resets_half_open() {
358 let cb = breaker_with_cooldown();
359 {
360 let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
361 states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
362 }
363 cb.reset("agent1");
364 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
365 }
366
367 #[test]
368 fn different_agents_independent() {
369 let cb = test_breaker();
370 cb.record_failure("agent1");
371 cb.record_failure("agent1");
372 cb.record_failure("agent1");
373 assert_eq!(cb.check("agent1"), CanDeliver::No);
374 assert_eq!(cb.check("agent2"), CanDeliver::Yes);
375 }
376
377 #[test]
378 fn remove_clears_state() {
379 let cb = test_breaker();
380 cb.record_failure("agent1");
381 cb.record_failure("agent1");
382 cb.record_failure("agent1");
383 cb.remove("agent1");
384 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
385 }
386
387 #[test]
388 fn list_active_skips_healthy() {
389 let cb = test_breaker();
390 cb.record_failure("agent1");
391 cb.record_failure("agent1");
392 cb.record_failure("agent1");
393 let active = cb.list_active();
394 assert_eq!(active.len(), 1);
395 assert_eq!(active[0].agent_id, "agent1");
396 }
397
398 #[test]
399 fn full_lifecycle() {
400 let cb = breaker_with_cooldown();
401
402 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
404
405 for _ in 0..3 {
407 cb.record_failure("agent1");
408 }
409 assert_eq!(cb.check("agent1"), CanDeliver::No);
410
411 {
413 let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
414 states.insert(
415 "agent1".to_string(),
416 CircuitState::Open {
417 opened_at: Utc::now() - chrono::Duration::seconds(120),
418 failures: 3,
419 },
420 );
421 }
422 assert_eq!(cb.check("agent1"), CanDeliver::Probe);
423
424 cb.record_success("agent1");
426 assert_eq!(cb.check("agent1"), CanDeliver::Yes);
427 assert_eq!(cb.get_state("agent1").failures, 0);
428 }
429}