Skip to main content

oxios_kernel/
a2a_circuit_breaker.rs

1//! A2A protocol circuit breaker for delegation reliability.
2//!
3//! Prevents cascading failures when A2A delegation repeatedly fails.
4//!
5//! # Example
6//!
7//! ```
8//! use oxios_kernel::a2a_circuit_breaker::{A2ACircuitBreaker, CircuitState};
9//!
10//! let cb = A2ACircuitBreaker::new(3, 30);  // 3 failures, 30s reset
11//! assert_eq!(cb.state(), CircuitState::Closed);
12//! assert!(cb.is_allowed());
13//! ```
14
15use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
16use std::time::{Duration, Instant};
17
18/// Circuit breaker states.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CircuitState {
21    /// Normal operation — requests allowed.
22    Closed,
23    /// Too many failures — requests blocked.
24    Open,
25    /// Testing recovery — limited requests allowed.
26    HalfOpen,
27}
28
29impl CircuitState {
30    fn from_u8(v: u8) -> Self {
31        match v {
32            0 => CircuitState::Closed,
33            1 => CircuitState::Open,
34            2 => CircuitState::HalfOpen,
35            _ => CircuitState::Closed,
36        }
37    }
38}
39
40/// A2A delegation circuit breaker.
41///
42/// Tracks failures and opens the circuit when threshold is exceeded.
43/// After timeout, allows limited test requests (half-open state).
44#[derive(Debug)]
45pub struct A2ACircuitBreaker {
46    state: AtomicU8,
47    failure_count: AtomicU32,
48    success_count: AtomicU32,
49    last_failure_time: AtomicU64,
50    threshold: u32,
51    reset_timeout: Duration,
52}
53
54impl A2ACircuitBreaker {
55    /// Create a new circuit breaker.
56    ///
57    /// # Arguments
58    /// * `threshold` - Number of consecutive failures before opening
59    /// * `reset_timeout_secs` - Seconds to wait before testing recovery
60    pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
61        Self {
62            state: AtomicU8::new(CircuitState::Closed as u8),
63            failure_count: AtomicU32::new(0),
64            success_count: AtomicU32::new(0),
65            last_failure_time: AtomicU64::new(0),
66            threshold,
67            reset_timeout: Duration::from_secs(reset_timeout_secs),
68        }
69    }
70
71    /// Current circuit state.
72    pub fn state(&self) -> CircuitState {
73        CircuitState::from_u8(self.state.load(Ordering::Relaxed))
74    }
75
76    /// Whether a request is allowed through the circuit.
77    pub fn is_allowed(&self) -> bool {
78        match self.state() {
79            CircuitState::Closed => true,
80            CircuitState::Open => {
81                // Check if reset timeout has passed
82                let last_failure = self.last_failure_time.load(Ordering::Relaxed);
83                let now = Instant::now().elapsed().as_secs();
84                if now.saturating_sub(last_failure) > self.reset_timeout.as_secs() {
85                    // Transition to half-open
86                    self.state
87                        .store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
88                    self.success_count.store(0, Ordering::Relaxed);
89                    true
90                } else {
91                    false
92                }
93            }
94            CircuitState::HalfOpen => {
95                // Allow limited requests (up to 2)
96                self.success_count.load(Ordering::Relaxed) < 2
97            }
98        }
99    }
100
101    /// Record a successful request.
102    pub fn record_success(&self) {
103        match self.state() {
104            CircuitState::HalfOpen => {
105                let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
106                if successes >= 2 {
107                    // Recovery successful → closed
108                    self.state
109                        .store(CircuitState::Closed as u8, Ordering::Relaxed);
110                    self.failure_count.store(0, Ordering::Relaxed);
111                    tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
112                }
113            }
114            CircuitState::Closed => {
115                // Reset failure count on success
116                self.failure_count.store(0, Ordering::Relaxed);
117            }
118            CircuitState::Open => {}
119        }
120    }
121
122    /// Record a failed request.
123    pub fn record_failure(&self) {
124        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
125        self.last_failure_time
126            .store(Instant::now().elapsed().as_secs(), Ordering::Relaxed);
127
128        if failures >= self.threshold && self.state() != CircuitState::Open {
129            self.state
130                .store(CircuitState::Open as u8, Ordering::Relaxed);
131            tracing::warn!(
132                failures,
133                threshold = self.threshold,
134                "A2A circuit breaker OPEN"
135            );
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[test]
145    fn test_initial_state_is_closed() {
146        let cb = A2ACircuitBreaker::new(3, 10);
147        assert_eq!(cb.state(), CircuitState::Closed);
148        assert!(cb.is_allowed());
149    }
150
151    #[test]
152    fn test_opens_after_threshold() {
153        let cb = A2ACircuitBreaker::new(3, 10);
154
155        cb.record_failure();
156        assert_eq!(cb.state(), CircuitState::Closed);
157
158        cb.record_failure();
159        assert_eq!(cb.state(), CircuitState::Closed);
160
161        cb.record_failure(); // Now at threshold
162        assert_eq!(cb.state(), CircuitState::Open);
163        assert!(!cb.is_allowed());
164    }
165
166    #[test]
167    fn test_success_resets_failure_count() {
168        let cb = A2ACircuitBreaker::new(3, 10);
169
170        cb.record_failure();
171        cb.record_failure();
172        cb.record_success(); // Should reset
173
174        assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
175    }
176}