Skip to main content

oxios_kernel/a2a/
circuit_breaker.rs

1//! A2A protocol circuit breaker for delegation reliability.
2//!
3//! Prevents cascading failures when A2A delegation repeatedly fails.
4//!
5//! # Example
6//!
7//! ```
8//! use oxios_kernel::a2a::circuit_breaker::{A2ACircuitBreaker, CircuitState};
9//!
10//! let cb = A2ACircuitBreaker::new(3, 30);  // 3 failures, 30s reset
11//! assert_eq!(cb.state(), CircuitState::Closed);
12//! assert!(cb.is_allowed());
13//! ```
14
15use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering};
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18/// Current wall-clock time in seconds since the UNIX epoch. Stored in the
19/// breaker's `AtomicU64` because `Instant` cannot be encoded as u64. The
20/// previous implementation stored `Instant::now().elapsed().as_secs()`, which
21/// is always 0 — so once Open the breaker never recovered.
22fn now_epoch_secs() -> u64 {
23    SystemTime::now()
24        .duration_since(UNIX_EPOCH)
25        .map(|d| d.as_secs())
26        .unwrap_or(0)
27}
28
29/// Circuit breaker states.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum CircuitState {
32    /// Normal operation — requests allowed.
33    Closed,
34    /// Too many failures — requests blocked.
35    Open,
36    /// Testing recovery — limited requests allowed.
37    HalfOpen,
38}
39
40impl CircuitState {
41    fn from_u8(v: u8) -> Self {
42        match v {
43            0 => CircuitState::Closed,
44            1 => CircuitState::Open,
45            2 => CircuitState::HalfOpen,
46            _ => CircuitState::Closed,
47        }
48    }
49}
50
51/// A2A delegation circuit breaker.
52///
53/// Tracks failures and opens the circuit when threshold is exceeded.
54/// After timeout, allows limited test requests (half-open state).
55#[derive(Debug)]
56pub struct A2ACircuitBreaker {
57    state: AtomicU8,
58    failure_count: AtomicU32,
59    success_count: AtomicU32,
60    last_failure_time: AtomicU64,
61    threshold: u32,
62    reset_timeout: Duration,
63}
64
65impl A2ACircuitBreaker {
66    /// Create a new circuit breaker.
67    ///
68    /// # Arguments
69    /// * `threshold` - Number of consecutive failures before opening
70    /// * `reset_timeout_secs` - Seconds to wait before testing recovery
71    pub fn new(threshold: u32, reset_timeout_secs: u64) -> Self {
72        Self {
73            state: AtomicU8::new(CircuitState::Closed as u8),
74            failure_count: AtomicU32::new(0),
75            success_count: AtomicU32::new(0),
76            last_failure_time: AtomicU64::new(0),
77            threshold,
78            reset_timeout: Duration::from_secs(reset_timeout_secs),
79        }
80    }
81
82    /// Current circuit state.
83    pub fn state(&self) -> CircuitState {
84        CircuitState::from_u8(self.state.load(Ordering::Relaxed))
85    }
86
87    /// Whether a request is allowed through the circuit.
88    pub fn is_allowed(&self) -> bool {
89        match self.state() {
90            CircuitState::Closed => true,
91            CircuitState::Open => {
92                // Check if reset timeout has passed
93                let last_failure = self.last_failure_time.load(Ordering::Relaxed);
94                let elapsed = now_epoch_secs().saturating_sub(last_failure);
95                if elapsed > self.reset_timeout.as_secs() {
96                    // Transition to half-open
97                    self.state
98                        .store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
99                    self.success_count.store(0, Ordering::Relaxed);
100                    true
101                } else {
102                    false
103                }
104            }
105            CircuitState::HalfOpen => {
106                // Allow limited requests (up to 2)
107                self.success_count.load(Ordering::Relaxed) < 2
108            }
109        }
110    }
111
112    /// Record a successful request.
113    pub fn record_success(&self) {
114        match self.state() {
115            CircuitState::HalfOpen => {
116                let successes = self.success_count.fetch_add(1, Ordering::Relaxed) + 1;
117                if successes >= 2 {
118                    // Recovery successful → closed
119                    self.state
120                        .store(CircuitState::Closed as u8, Ordering::Relaxed);
121                    self.failure_count.store(0, Ordering::Relaxed);
122                    tracing::info!("A2A circuit breaker CLOSED (recovery successful)");
123                }
124            }
125            CircuitState::Closed => {
126                // Reset failure count on success
127                self.failure_count.store(0, Ordering::Relaxed);
128            }
129            CircuitState::Open => {}
130        }
131    }
132
133    /// Record a failed request.
134    pub fn record_failure(&self) {
135        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
136        self.last_failure_time
137            .store(now_epoch_secs(), Ordering::Relaxed);
138
139        if failures >= self.threshold && self.state() != CircuitState::Open {
140            self.state
141                .store(CircuitState::Open as u8, Ordering::Relaxed);
142            tracing::warn!(
143                failures,
144                threshold = self.threshold,
145                "A2A circuit breaker OPEN"
146            );
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_initial_state_is_closed() {
157        let cb = A2ACircuitBreaker::new(3, 10);
158        assert_eq!(cb.state(), CircuitState::Closed);
159        assert!(cb.is_allowed());
160    }
161
162    #[test]
163    fn test_opens_after_threshold() {
164        let cb = A2ACircuitBreaker::new(3, 10);
165
166        cb.record_failure();
167        assert_eq!(cb.state(), CircuitState::Closed);
168
169        cb.record_failure();
170        assert_eq!(cb.state(), CircuitState::Closed);
171
172        cb.record_failure(); // Now at threshold
173        assert_eq!(cb.state(), CircuitState::Open);
174        assert!(!cb.is_allowed());
175    }
176
177    #[test]
178    fn test_success_resets_failure_count() {
179        let cb = A2ACircuitBreaker::new(3, 10);
180
181        cb.record_failure();
182        cb.record_failure();
183        cb.record_success(); // Should reset
184
185        assert_eq!(cb.failure_count.load(Ordering::Relaxed), 0);
186    }
187}