Skip to main content

oxios_kernel/
a2a_circuit_breaker.rs

1//! A2A protocol circuit breaker for delegation reliability.
2//!
3//! Prevents cascading failures when A2A delegation repeatedly fails.
4//!
5//! # Example
6//!
7//! ```
8//! use oxios_kernel::a2a_circuit_breaker::{A2ACircuitBreaker, CircuitState};
9//!
10//! let cb = A2ACircuitBreaker::new(3, 30);  // 3 failures, 30s reset
11//! assert_eq!(cb.state(), CircuitState::Closed);
12//! assert!(cb.is_allowed());
13//! ```
14
15use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
16use std::time::{Duration, Instant};
17
18/// Circuit breaker states.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CircuitState {
21    /// Normal operation — requests allowed.
22    Closed,
23    /// Too many failures — requests blocked.
24    Open,
25    /// Testing recovery — limited requests allowed.
26    HalfOpen,
27}
28
29impl CircuitState {
30    fn from_u8(v: u8) -> Self {
31        match v {
32            0 => CircuitState::Closed,
33            1 => CircuitState::Open,
34            2 => CircuitState::HalfOpen,
35            _ => CircuitState::Closed,
36        }
37    }
38}
39
40/// A2A delegation circuit breaker.
41///
42/// Tracks failures and opens the circuit when threshold is exceeded.
43/// After timeout, allows limited test requests (half-open state).
44#[derive(Debug)]
45pub struct A2ACircuitBreaker {
46    state: AtomicU8,
47    failure_count: AtomicU32,
48    success_count: AtomicU32,
49    last_failure_time: AtomicU64,
50    threshold: u32,
51    reset_timeout: Duration,
52}
53
54impl A2ACircuitBreaker {
55    /// Create a new circuit breaker.
56    ///
57    /// # Arguments
58    /// * `threshold` - Number of consecutive failures before opening
59    /// * `reset_timeout_secs` - Seconds to wait before testing recovery
60    pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
61        Self {
62            state: AtomicU8::new(CircuitState::Closed as u8),
63            failure_count: AtomicU32::new(0),
64            success_count: AtomicU32::new(0),
65            last_failure_time: AtomicU64::new(0),
66            threshold,
67            reset_timeout: Duration::from_secs(reset_timeout_secs),
68        }
69    }
70
71    /// Current circuit state.
72    pub fn state(&self) -> CircuitState {
73        CircuitState::from_u8(self.state.load(Ordering::Relaxed))
74    }
75
76    /// Whether a request is allowed through the circuit.
77    pub fn is_allowed(&self) -> bool {
78        match self.state() {
79            CircuitState::Closed => true,
80            CircuitState::Open => {
81                // Check if reset timeout has passed
82                let last_failure = self.last_failure_time.load(Ordering::Relaxed);
83                let now = Instant::now().elapsed().as_secs();
84                if now.saturating_sub(last_failure) > self.reset_timeout.as_secs() as u64 {
85                    // Transition to half-open
86                    self.state.store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
87                    self.success_count.store(0, Ordering::Relaxed);
88                    true
89                } else {
90                    false
91                }
92            }
93            CircuitState::HalfOpen => {
94                // Allow limited requests (up to 2)
95                self.success_count.load(Ordering::Relaxed) < 2
96            }
97        }
98    }
99
100    /// Record a successful request.
101    pub fn record_success(&self) {
102        match self.state() {
103            CircuitState::HalfOpen => {
104                let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
105                if successes >= 2 {
106                    // Recovery successful → closed
107                    self.state.store(CircuitState::Closed as u8, Ordering::Relaxed);
108                    self.failure_count.store(0, Ordering::Relaxed);
109                    tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
110                }
111            }
112            CircuitState::Closed => {
113                // Reset failure count on success
114                self.failure_count.store(0, Ordering::Relaxed);
115            }
116            CircuitState::Open => {}
117        }
118    }
119
120    /// Record a failed request.
121    pub fn record_failure(&self) {
122        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
123        self.last_failure_time.store(
124            Instant::now().elapsed().as_secs(),
125            Ordering::Relaxed,
126        );
127
128        if failures >= self.threshold && self.state() != CircuitState::Open {
129            self.state.store(CircuitState::Open as u8, Ordering::Relaxed);
130            tracing::warn!(
131                failures,
132                threshold = self.threshold,
133                "A2A circuit breaker OPEN"
134            );
135        }
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn test_initial_state_is_closed() {
145        let cb = A2ACircuitBreaker::new(3, 10);
146        assert_eq!(cb.state(), CircuitState::Closed);
147        assert!(cb.is_allowed());
148    }
149
150    #[test]
151    fn test_opens_after_threshold() {
152        let cb = A2ACircuitBreaker::new(3, 10);
153        
154        cb.record_failure();
155        assert_eq!(cb.state(), CircuitState::Closed);
156        
157        cb.record_failure();
158        assert_eq!(cb.state(), CircuitState::Closed);
159        
160        cb.record_failure();  // Now at threshold
161        assert_eq!(cb.state(), CircuitState::Open);
162        assert!(!cb.is_allowed());
163    }
164
165    #[test]
166    fn test_success_resets_failure_count() {
167        let cb = A2ACircuitBreaker::new(3, 10);
168        
169        cb.record_failure();
170        cb.record_failure();
171        cb.record_success();  // Should reset
172        
173        assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
174    }
175}