oxios_kernel/
a2a_circuit_breaker.rs1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
16use std::time::{Duration, Instant};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CircuitState {
21 Closed,
23 Open,
25 HalfOpen,
27}
28
29impl CircuitState {
30 fn from_u8(v: u8) -> Self {
31 match v {
32 0 => CircuitState::Closed,
33 1 => CircuitState::Open,
34 2 => CircuitState::HalfOpen,
35 _ => CircuitState::Closed,
36 }
37 }
38}
39
40#[derive(Debug)]
45pub struct A2ACircuitBreaker {
46 state: AtomicU8,
47 failure_count: AtomicU32,
48 success_count: AtomicU32,
49 last_failure_time: AtomicU64,
50 threshold: u32,
51 reset_timeout: Duration,
52}
53
54impl A2ACircuitBreaker {
55 pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
61 Self {
62 state: AtomicU8::new(CircuitState::Closed as u8),
63 failure_count: AtomicU32::new(0),
64 success_count: AtomicU32::new(0),
65 last_failure_time: AtomicU64::new(0),
66 threshold,
67 reset_timeout: Duration::from_secs(reset_timeout_secs),
68 }
69 }
70
71 pub fn state(&self) -> CircuitState {
73 CircuitState::from_u8(self.state.load(Ordering::Relaxed))
74 }
75
76 pub fn is_allowed(&self) -> bool {
78 match self.state() {
79 CircuitState::Closed => true,
80 CircuitState::Open => {
81 let last_failure = self.last_failure_time.load(Ordering::Relaxed);
83 let now = Instant::now().elapsed().as_secs();
84 if now.saturating_sub(last_failure) > self.reset_timeout.as_secs() {
85 self.state
87 .store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
88 self.success_count.store(0, Ordering::Relaxed);
89 true
90 } else {
91 false
92 }
93 }
94 CircuitState::HalfOpen => {
95 self.success_count.load(Ordering::Relaxed) < 2
97 }
98 }
99 }
100
101 pub fn record_success(&self) {
103 match self.state() {
104 CircuitState::HalfOpen => {
105 let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
106 if successes >= 2 {
107 self.state
109 .store(CircuitState::Closed as u8, Ordering::Relaxed);
110 self.failure_count.store(0, Ordering::Relaxed);
111 tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
112 }
113 }
114 CircuitState::Closed => {
115 self.failure_count.store(0, Ordering::Relaxed);
117 }
118 CircuitState::Open => {}
119 }
120 }
121
122 pub fn record_failure(&self) {
124 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
125 self.last_failure_time
126 .store(Instant::now().elapsed().as_secs(), Ordering::Relaxed);
127
128 if failures >= self.threshold && self.state() != CircuitState::Open {
129 self.state
130 .store(CircuitState::Open as u8, Ordering::Relaxed);
131 tracing::warn!(
132 failures,
133 threshold = self.threshold,
134 "A2A circuit breaker OPEN"
135 );
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn test_initial_state_is_closed() {
146 let cb = A2ACircuitBreaker::new(3, 10);
147 assert_eq!(cb.state(), CircuitState::Closed);
148 assert!(cb.is_allowed());
149 }
150
151 #[test]
152 fn test_opens_after_threshold() {
153 let cb = A2ACircuitBreaker::new(3, 10);
154
155 cb.record_failure();
156 assert_eq!(cb.state(), CircuitState::Closed);
157
158 cb.record_failure();
159 assert_eq!(cb.state(), CircuitState::Closed);
160
161 cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
163 assert!(!cb.is_allowed());
164 }
165
166 #[test]
167 fn test_success_resets_failure_count() {
168 let cb = A2ACircuitBreaker::new(3, 10);
169
170 cb.record_failure();
171 cb.record_failure();
172 cb.record_success(); assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
175 }
176}