oxios_kernel/
a2a_circuit_breaker.rs1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
16use std::time::{Duration, Instant};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CircuitState {
21 Closed,
23 Open,
25 HalfOpen,
27}
28
29impl CircuitState {
30 fn from_u8(v: u8) -> Self {
31 match v {
32 0 => CircuitState::Closed,
33 1 => CircuitState::Open,
34 2 => CircuitState::HalfOpen,
35 _ => CircuitState::Closed,
36 }
37 }
38}
39
40#[derive(Debug)]
45pub struct A2ACircuitBreaker {
46 state: AtomicU8,
47 failure_count: AtomicU32,
48 success_count: AtomicU32,
49 last_failure_time: AtomicU64,
50 threshold: u32,
51 reset_timeout: Duration,
52}
53
54impl A2ACircuitBreaker {
55 pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
61 Self {
62 state: AtomicU8::new(CircuitState::Closed as u8),
63 failure_count: AtomicU32::new(0),
64 success_count: AtomicU32::new(0),
65 last_failure_time: AtomicU64::new(0),
66 threshold,
67 reset_timeout: Duration::from_secs(reset_timeout_secs),
68 }
69 }
70
71 pub fn state(&self) -> CircuitState {
73 CircuitState::from_u8(self.state.load(Ordering::Relaxed))
74 }
75
76 pub fn is_allowed(&self) -> bool {
78 match self.state() {
79 CircuitState::Closed => true,
80 CircuitState::Open => {
81 let last_failure = self.last_failure_time.load(Ordering::Relaxed);
83 let now = Instant::now().elapsed().as_secs();
84 if now.saturating_sub(last_failure) > self.reset_timeout.as_secs() as u64 {
85 self.state.store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
87 self.success_count.store(0, Ordering::Relaxed);
88 true
89 } else {
90 false
91 }
92 }
93 CircuitState::HalfOpen => {
94 self.success_count.load(Ordering::Relaxed) < 2
96 }
97 }
98 }
99
100 pub fn record_success(&self) {
102 match self.state() {
103 CircuitState::HalfOpen => {
104 let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
105 if successes >= 2 {
106 self.state.store(CircuitState::Closed as u8, Ordering::Relaxed);
108 self.failure_count.store(0, Ordering::Relaxed);
109 tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
110 }
111 }
112 CircuitState::Closed => {
113 self.failure_count.store(0, Ordering::Relaxed);
115 }
116 CircuitState::Open => {}
117 }
118 }
119
120 pub fn record_failure(&self) {
122 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
123 self.last_failure_time.store(
124 Instant::now().elapsed().as_secs(),
125 Ordering::Relaxed,
126 );
127
128 if failures >= self.threshold && self.state() != CircuitState::Open {
129 self.state.store(CircuitState::Open as u8, Ordering::Relaxed);
130 tracing::warn!(
131 failures,
132 threshold = self.threshold,
133 "A2A circuit breaker OPEN"
134 );
135 }
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn test_initial_state_is_closed() {
145 let cb = A2ACircuitBreaker::new(3, 10);
146 assert_eq!(cb.state(), CircuitState::Closed);
147 assert!(cb.is_allowed());
148 }
149
150 #[test]
151 fn test_opens_after_threshold() {
152 let cb = A2ACircuitBreaker::new(3, 10);
153
154 cb.record_failure();
155 assert_eq!(cb.state(), CircuitState::Closed);
156
157 cb.record_failure();
158 assert_eq!(cb.state(), CircuitState::Closed);
159
160 cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
162 assert!(!cb.is_allowed());
163 }
164
165 #[test]
166 fn test_success_resets_failure_count() {
167 let cb = A2ACircuitBreaker::new(3, 10);
168
169 cb.record_failure();
170 cb.record_failure();
171 cb.record_success(); assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
174 }
175}