oxios_kernel/a2a/
circuit_breaker.rs1use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18fn now_epoch_secs() -> u64 {
23 SystemTime::now()
24 .duration_since(UNIX_EPOCH)
25 .map(|d| d.as_secs())
26 .unwrap_or(0)
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum CircuitState {
32 Closed,
34 Open,
36 HalfOpen,
38}
39
40impl CircuitState {
41 fn from_u8(v: u8) -> Self {
42 match v {
43 0 => CircuitState::Closed,
44 1 => CircuitState::Open,
45 2 => CircuitState::HalfOpen,
46 _ => CircuitState::Closed,
47 }
48 }
49}
50
51#[derive(Debug)]
56pub struct A2ACircuitBreaker {
57 state: AtomicU8,
58 failure_count: AtomicU32,
59 success_count: AtomicU32,
60 last_failure_time: AtomicU64,
61 threshold: u32,
62 reset_timeout: Duration,
63}
64
65impl A2ACircuitBreaker {
66 pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
72 Self {
73 state: AtomicU8::new(CircuitState::Closed as u8),
74 failure_count: AtomicU32::new(0),
75 success_count: AtomicU32::new(0),
76 last_failure_time: AtomicU64::new(0),
77 threshold,
78 reset_timeout: Duration::from_secs(reset_timeout_secs),
79 }
80 }
81
82 pub fn state(&self) -> CircuitState {
84 CircuitState::from_u8(self.state.load(Ordering::Relaxed))
85 }
86
87 pub fn is_allowed(&self) -> bool {
89 match self.state() {
90 CircuitState::Closed => true,
91 CircuitState::Open => {
92 let last_failure = self.last_failure_time.load(Ordering::Relaxed);
94 let elapsed = now_epoch_secs().saturating_sub(last_failure);
95 if elapsed > self.reset_timeout.as_secs() {
96 self.state
98 .store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
99 self.success_count.store(0, Ordering::Relaxed);
100 true
101 } else {
102 false
103 }
104 }
105 CircuitState::HalfOpen => {
106 self.success_count.load(Ordering::Relaxed) < 2
108 }
109 }
110 }
111
112 pub fn record_success(&self) {
114 match self.state() {
115 CircuitState::HalfOpen => {
116 let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
117 if successes >= 2 {
118 self.state
120 .store(CircuitState::Closed as u8, Ordering::Relaxed);
121 self.failure_count.store(0, Ordering::Relaxed);
122 tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
123 }
124 }
125 CircuitState::Closed => {
126 self.failure_count.store(0, Ordering::Relaxed);
128 }
129 CircuitState::Open => {}
130 }
131 }
132
133 pub fn record_failure(&self) {
135 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
136 self.last_failure_time
137 .store(now_epoch_secs(), Ordering::Relaxed);
138
139 if failures >= self.threshold && self.state() != CircuitState::Open {
140 self.state
141 .store(CircuitState::Open as u8, Ordering::Relaxed);
142 tracing::warn!(
143 failures,
144 threshold = self.threshold,
145 "A2A circuit breaker OPEN"
146 );
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn test_initial_state_is_closed() {
157 let cb = A2ACircuitBreaker::new(3, 10);
158 assert_eq!(cb.state(), CircuitState::Closed);
159 assert!(cb.is_allowed());
160 }
161
162 #[test]
163 fn test_opens_after_threshold() {
164 let cb = A2ACircuitBreaker::new(3, 10);
165
166 cb.record_failure();
167 assert_eq!(cb.state(), CircuitState::Closed);
168
169 cb.record_failure();
170 assert_eq!(cb.state(), CircuitState::Closed);
171
172 cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
174 assert!(!cb.is_allowed());
175 }
176
177 #[test]
178 fn test_success_resets_failure_count() {
179 let cb = A2ACircuitBreaker::new(3, 10);
180
181 cb.record_failure();
182 cb.record_failure();
183 cb.record_success(); assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
186 }
187}