pub use oxi_ai::circuit_breaker::{CircuitBreakerConfig, CircuitOpenError, ProviderCircuitBreaker};
pub use oxi_ai::fallback_chain::FallbackChain;
pub use oxi_ai::partial_response::PartialResponse;
use oxi_ai::circuit_breaker::CircuitBreakerConfig as CircuitBreakerConfigLocal;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::time::Instant;
#[derive(Debug, thiserror::Error)]
#[error("Circuit is open — retry after {remaining:?}")]
pub struct CircuitOpenErrorLocal {
pub remaining: std::time::Duration,
}
#[derive(Debug)]
pub struct CircuitBreaker {
config: CircuitBreakerConfigLocal,
state: AtomicU8,
consecutive_failures: AtomicU64,
consecutive_successes: AtomicU64,
opened_at: Mutex<Option<Instant>>,
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfigLocal) -> Self {
Self {
config,
state: AtomicU8::new(0), consecutive_failures: AtomicU64::new(0),
consecutive_successes: AtomicU64::new(0),
opened_at: Mutex::new(None),
}
}
pub fn allow_request(&self) -> Result<(), CircuitOpenErrorLocal> {
let state = self.state.load(Ordering::SeqCst);
match state {
0 => Ok(()), 1 => {
let opened_at = self.opened_at.lock();
if let Some(t) = *opened_at {
if t.elapsed() >= self.config.open_duration {
drop(opened_at);
self.state.store(2, Ordering::SeqCst); self.consecutive_successes.store(0, Ordering::SeqCst);
return Ok(());
}
let remaining = self.config.open_duration.saturating_sub(t.elapsed());
return Err(CircuitOpenErrorLocal { remaining });
}
drop(opened_at);
self.state.store(2, Ordering::SeqCst);
Ok(())
}
_ => Ok(()), }
}
pub fn record_success(&self) {
let state = self.state.load(Ordering::SeqCst);
if state == 0 {
self.consecutive_failures.store(0, Ordering::SeqCst);
} else if state == 2 {
let prev = self.consecutive_successes.fetch_add(1, Ordering::SeqCst);
if prev + 1 >= self.config.half_open_successes as u64 {
self.state.store(0, Ordering::SeqCst);
self.consecutive_failures.store(0, Ordering::SeqCst);
}
}
}
pub fn record_failure(&self) {
let state = self.state.load(Ordering::SeqCst);
if state == 0 {
let prev = self.consecutive_failures.fetch_add(1, Ordering::SeqCst);
if prev + 1 >= self.config.failure_threshold as u64 {
self.state.store(1, Ordering::SeqCst);
*self.opened_at.lock() = Some(Instant::now());
}
} else if state == 2 {
self.state.store(1, Ordering::SeqCst);
*self.opened_at.lock() = Some(Instant::now());
}
}
pub fn reset(&self) {
self.state.store(0, Ordering::SeqCst);
self.consecutive_failures.store(0, Ordering::SeqCst);
self.consecutive_successes.store(0, Ordering::SeqCst);
*self.opened_at.lock() = None;
}
}