#![warn(missing_docs)]
use super::backtrace::DecrustBacktrace as Backtrace;
use super::{DecrustError, Result};
use std::collections::VecDeque;
use std::fmt;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};
use tracing::info;
pub struct DebugIgnore<T: ?Sized>(pub T);
impl<T: ?Sized> fmt::Debug for DebugIgnore<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "<function>")
}
}
impl<T: Clone> Clone for DebugIgnore<T> {
fn clone(&self) -> Self {
DebugIgnore(self.0.clone())
}
}
#[cfg(feature = "rand")]
#[allow(unused_imports)]
use rand::Rng;
#[cfg(feature = "tokio")]
use tokio::time;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum CircuitBreakerState {
#[default]
Closed,
Open,
HalfOpen,
}
impl fmt::Display for CircuitBreakerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CircuitOperationType {
Success,
Failure,
Rejected,
Timeout,
}
#[derive(Debug, Clone)]
pub struct CircuitTransitionEvent {
pub from_state: CircuitBreakerState,
pub to_state: CircuitBreakerState,
pub timestamp: SystemTime,
pub reason: String,
}
pub trait CircuitBreakerObserver: Send + Sync {
fn on_state_change(&self, name: &str, event: &CircuitTransitionEvent);
fn on_operation_attempt(&self, name: &str, state: CircuitBreakerState);
fn on_operation_result(
&self,
name: &str,
op_type: CircuitOperationType,
duration: Duration,
error: Option<&DecrustError>,
);
fn on_reset(&self, name: &str);
}
#[derive(Debug, Clone, Default)]
pub struct CircuitMetrics {
pub state: CircuitBreakerState,
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub rejected_requests: u64,
pub timeout_requests: u64,
pub consecutive_failures: u32,
pub consecutive_successes: u32,
pub last_error_timestamp: Option<SystemTime>,
pub last_transition_timestamp: Option<SystemTime>,
pub failure_rate_in_window: Option<f64>,
pub slow_call_rate_in_window: Option<f64>,
}
pub type ErrorPredicate = Arc<dyn Fn(&DecrustError) -> bool + Send + Sync>;
#[derive(Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: usize,
pub failure_rate_threshold: f64,
pub minimum_request_threshold_for_rate: usize,
pub success_threshold_to_close: usize,
pub reset_timeout: Duration,
pub half_open_max_concurrent_operations: usize,
pub operation_timeout: Option<Duration>,
pub sliding_window_size: usize,
pub error_predicate: Option<ErrorPredicate>,
pub metrics_window_size: usize, pub track_metrics: bool,
pub slow_call_duration_threshold: Option<Duration>,
pub slow_call_rate_threshold: Option<f64>,
pub circuit_breaker_threshold: u32,
pub circuit_breaker_cooldown: Duration,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
failure_rate_threshold: 0.5,
minimum_request_threshold_for_rate: 10,
success_threshold_to_close: 3,
reset_timeout: Duration::from_secs(30),
half_open_max_concurrent_operations: 1,
operation_timeout: Some(Duration::from_secs(5)),
sliding_window_size: 100,
error_predicate: None,
metrics_window_size: 100, track_metrics: true,
slow_call_duration_threshold: None, slow_call_rate_threshold: None, circuit_breaker_threshold: 3, circuit_breaker_cooldown: Duration::from_secs(60), }
}
}
impl fmt::Debug for CircuitBreakerConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CircuitBreakerConfig")
.field("failure_threshold", &self.failure_threshold)
.field("failure_rate_threshold", &self.failure_rate_threshold)
.field(
"minimum_request_threshold_for_rate",
&self.minimum_request_threshold_for_rate,
)
.field(
"success_threshold_to_close",
&self.success_threshold_to_close,
)
.field("reset_timeout", &self.reset_timeout)
.field(
"half_open_max_concurrent_operations",
&self.half_open_max_concurrent_operations,
)
.field("operation_timeout", &self.operation_timeout)
.field("sliding_window_size", &self.sliding_window_size)
.field(
"error_predicate",
&if self.error_predicate.is_some() {
"Some(<function>)"
} else {
"None"
},
)
.field("metrics_window_size", &self.metrics_window_size)
.field("track_metrics", &self.track_metrics)
.field(
"slow_call_duration_threshold",
&self.slow_call_duration_threshold,
)
.field("slow_call_rate_threshold", &self.slow_call_rate_threshold)
.field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
.field("circuit_breaker_cooldown", &self.circuit_breaker_cooldown)
.finish()
}
}
#[derive(Debug)] struct InnerState {
state: CircuitBreakerState,
opened_at: Option<Instant>,
half_open_entered_at: Option<Instant>,
consecutive_failures: usize,
consecutive_successes: usize,
half_open_concurrency_count: usize,
results_window: VecDeque<bool>, slow_call_window: VecDeque<bool>, metrics: CircuitMetrics,
}
impl Default for InnerState {
fn default() -> Self {
Self {
state: CircuitBreakerState::Closed,
opened_at: None,
half_open_entered_at: None,
consecutive_failures: 0,
consecutive_successes: 0,
half_open_concurrency_count: 0,
results_window: VecDeque::with_capacity(100),
slow_call_window: VecDeque::with_capacity(100),
metrics: CircuitMetrics::default(),
}
}
}
pub struct CircuitBreaker {
name: String,
config: CircuitBreakerConfig,
inner: RwLock<InnerState>,
observers: Mutex<Vec<Arc<dyn CircuitBreakerObserver>>>,
}
impl CircuitBreaker {
pub fn new(name: impl Into<String>, config: CircuitBreakerConfig) -> Arc<Self> {
Arc::new(Self {
name: name.into(),
config,
inner: RwLock::new(InnerState::default()),
observers: Mutex::new(Vec::new()),
})
}
pub fn add_observer(&self, observer: Arc<dyn CircuitBreakerObserver>) {
let mut observers = self.observers.lock().unwrap();
observers.push(observer);
}
pub fn state(&self) -> CircuitBreakerState {
let inner = self.inner.read().unwrap();
inner.state
}
pub fn metrics(&self) -> CircuitMetrics {
let inner = self.inner.read().unwrap();
inner.metrics.clone()
}
pub fn trip(&self) {
let mut inner = self.inner.write().unwrap();
let prev_state = inner.state;
inner.state = CircuitBreakerState::Open;
inner.opened_at = Some(Instant::now());
inner.consecutive_failures = self.config.failure_threshold;
inner.consecutive_successes = 0;
let event = CircuitTransitionEvent {
from_state: prev_state,
to_state: CircuitBreakerState::Open,
timestamp: SystemTime::now(),
reason: "Manual trip".to_string(),
};
inner.metrics.state = CircuitBreakerState::Open;
inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
inner.metrics.consecutive_successes = 0;
inner.metrics.last_transition_timestamp = Some(SystemTime::now());
drop(inner);
self.notify_state_change(&event);
}
pub fn reset(&self) {
let mut inner = self.inner.write().unwrap();
let prev_state = inner.state;
inner.state = CircuitBreakerState::Closed;
inner.opened_at = None;
inner.half_open_entered_at = None;
inner.consecutive_failures = 0;
inner.consecutive_successes = 0;
inner.half_open_concurrency_count = 0;
inner.metrics.state = CircuitBreakerState::Closed;
inner.metrics.consecutive_failures = 0;
inner.metrics.consecutive_successes = 0;
inner.metrics.last_transition_timestamp = Some(SystemTime::now());
inner.results_window.clear();
inner.slow_call_window.clear();
let event = CircuitTransitionEvent {
from_state: prev_state,
to_state: CircuitBreakerState::Closed,
timestamp: SystemTime::now(),
reason: "Manual reset".to_string(),
};
drop(inner);
self.notify_state_change(&event);
self.notify_reset();
}
#[cfg(not(feature = "std-thread"))]
pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
where
F: FnOnce() -> Result<Ret>,
{
let start_time = Instant::now();
let state = self.state();
self.notify_operation_attempt(state);
match state {
CircuitBreakerState::Open => {
let inner = self.inner.read().unwrap();
let should_transition = if let Some(opened_at) = inner.opened_at {
opened_at.elapsed() >= self.config.reset_timeout
} else {
false
};
drop(inner);
if should_transition {
self.transition_to_half_open("Reset timeout elapsed");
self.execute_half_open(operation, start_time)
} else {
self.record_rejected();
Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(
self.config
.reset_timeout
.checked_sub(
self.inner.read().unwrap().opened_at.unwrap().elapsed(),
)
.unwrap_or_default(),
),
failure_count: None,
last_error: None,
backtrace: Backtrace::generate(),
})
}
}
CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
}
}
#[cfg(feature = "std-thread")]
pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
where
F: FnOnce() -> Result<Ret> + Send + 'static,
Ret: Send + 'static,
{
let start_time = Instant::now();
let state = self.state();
self.notify_operation_attempt(state);
match state {
CircuitBreakerState::Open => {
let inner = self.inner.read().unwrap();
let should_transition = if let Some(opened_at) = inner.opened_at {
opened_at.elapsed() >= self.config.reset_timeout
} else {
false
};
drop(inner);
if should_transition {
self.transition_to_half_open("Reset timeout elapsed");
self.execute_half_open(operation, start_time)
} else {
self.record_rejected();
Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(
self.config
.reset_timeout
.checked_sub(
self.inner.read().unwrap().opened_at.unwrap().elapsed(),
)
.unwrap_or_default(),
),
failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
last_error: None, backtrace: Backtrace::generate(),
})
}
}
CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
}
}
#[cfg(feature = "tokio")]
pub async fn execute_async<F, Fut, Ret>(&self, operation: F) -> Result<Ret>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<Ret>>,
{
let start_time = Instant::now();
let state = self.state();
self.notify_operation_attempt(state);
match state {
CircuitBreakerState::Open => {
let inner = self.inner.read().unwrap();
let should_transition = if let Some(opened_at) = inner.opened_at {
opened_at.elapsed() >= self.config.reset_timeout
} else {
false
};
drop(inner);
if should_transition {
self.transition_to_half_open("Reset timeout elapsed");
self.execute_half_open_async(operation, start_time).await
} else {
self.record_rejected();
Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(
self.config
.reset_timeout
.checked_sub(
self.inner.read().unwrap().opened_at.unwrap().elapsed(),
)
.unwrap_or_default(),
),
failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
last_error: None, backtrace: Backtrace::generate(),
})
}
}
CircuitBreakerState::HalfOpen => {
self.execute_half_open_async(operation, start_time).await
}
CircuitBreakerState::Closed => self.execute_closed_async(operation, start_time).await,
}
}
#[cfg(not(feature = "std-thread"))]
fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
where
F: FnOnce() -> Result<Ret>,
{
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout(operation, timeout)
} else {
operation()
};
let duration = start_time.elapsed();
match &result {
Ok(_) => {
self.record_success(duration);
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
if self.should_open_circuit() {
self.transition_to_open("Failure threshold reached");
}
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(feature = "std-thread")]
fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
where
F: FnOnce() -> Result<Ret> + Send + 'static,
Ret: Send + 'static,
{
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout(operation, timeout)
} else {
operation()
};
let duration = start_time.elapsed();
match &result {
Ok(_) => {
self.record_success(duration);
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
if self.should_open_circuit() {
self.transition_to_open("Failure threshold reached");
}
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(not(feature = "std-thread"))]
fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
where
F: FnOnce() -> Result<Ret>,
{
{
let mut inner = self.inner.write().unwrap();
if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
{
self.record_rejected();
return Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(Duration::from_millis(100)),
failure_count: None,
last_error: None,
backtrace: Backtrace::generate(),
});
}
inner.half_open_concurrency_count += 1;
}
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout(operation, timeout)
} else {
operation()
};
let duration = start_time.elapsed();
{
let mut inner = self.inner.write().unwrap();
inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
}
match &result {
Ok(_) => {
self.record_success(duration);
let close_circuit = {
let inner = self.inner.read().unwrap();
inner.consecutive_successes >= self.config.success_threshold_to_close
};
if close_circuit {
self.transition_to_closed("Success threshold reached");
}
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
self.transition_to_open("Failure in half-open state");
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(feature = "std-thread")]
fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
where
F: FnOnce() -> Result<Ret> + Send + 'static,
Ret: Send + 'static,
{
{
let mut inner = self.inner.write().unwrap();
if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
{
self.record_rejected();
return Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(Duration::from_millis(100)),
failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
last_error: None, backtrace: Backtrace::generate(),
});
}
inner.half_open_concurrency_count += 1;
}
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout(operation, timeout)
} else {
operation()
};
let duration = start_time.elapsed();
{
let mut inner = self.inner.write().unwrap();
inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
}
match &result {
Ok(_) => {
self.record_success(duration);
let close_circuit = {
let inner = self.inner.read().unwrap();
inner.consecutive_successes >= self.config.success_threshold_to_close
};
if close_circuit {
self.transition_to_closed("Success threshold reached");
}
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
self.transition_to_open("Failure in half-open state");
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(feature = "tokio")]
async fn execute_closed_async<F, Fut, Ret>(
&self,
operation: F,
start_time: Instant,
) -> Result<Ret>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<Ret>>,
{
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout_async(operation, timeout).await
} else {
operation().await
};
let duration = start_time.elapsed();
match &result {
Ok(_) => {
self.record_success(duration);
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
if self.should_open_circuit() {
self.transition_to_open("Failure threshold reached");
}
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(feature = "tokio")]
async fn execute_half_open_async<F, Fut, Ret>(
&self,
operation: F,
start_time: Instant,
) -> Result<Ret>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<Ret>>,
{
{
let mut inner = self.inner.write().unwrap();
if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
{
self.record_rejected();
return Err(DecrustError::CircuitBreakerOpen {
name: self.name.clone(),
retry_after: Some(Duration::from_millis(100)),
failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
last_error: None, backtrace: Backtrace::generate(),
});
}
inner.half_open_concurrency_count += 1;
}
let result = if let Some(timeout) = self.config.operation_timeout {
self.execute_with_timeout_async(operation, timeout).await
} else {
operation().await
};
let duration = start_time.elapsed();
{
let mut inner = self.inner.write().unwrap();
inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
}
match &result {
Ok(_) => {
self.record_success(duration);
let close_circuit = {
let inner = self.inner.read().unwrap();
inner.consecutive_successes >= self.config.success_threshold_to_close
};
if close_circuit {
self.transition_to_closed("Success threshold reached");
}
}
Err(e) => {
if self.should_count_as_failure(e) {
self.record_failure(e, duration);
self.transition_to_open("Failure in half-open state");
} else {
self.record_success(duration);
}
}
}
result
}
#[cfg(not(feature = "std-thread"))]
fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
where
F: FnOnce() -> Result<Ret>,
{
let start = Instant::now();
let result = operation();
if start.elapsed() > timeout {
self.record_timeout();
Err(DecrustError::Timeout {
operation: format!("Operation in circuit breaker '{}'", self.name),
duration: timeout,
backtrace: Backtrace::generate(),
})
} else {
result
}
}
#[cfg(feature = "std-thread")]
fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
where
F: FnOnce() -> Result<Ret> + Send + 'static,
Ret: Send + 'static,
{
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
let result = operation();
let _ = tx.send(result);
});
match rx.recv_timeout(timeout) {
Ok(result) => {
let _ = handle.join();
result
}
Err(_) => {
self.record_timeout();
Err(DecrustError::Timeout {
operation: format!("Operation in circuit breaker '{}'", self.name),
duration: timeout,
backtrace: Backtrace::generate(),
})
}
}
}
#[cfg(feature = "tokio")]
async fn execute_with_timeout_async<F, Fut, Ret>(
&self,
operation: F,
timeout: Duration,
) -> Result<Ret>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<Ret>>,
{
match time::timeout(timeout, operation()).await {
Ok(result) => result,
Err(_) => {
self.record_timeout();
Err(DecrustError::Timeout {
operation: format!("Operation in circuit breaker '{}'", self.name),
duration: timeout,
backtrace: Backtrace::generate(),
})
}
}
}
fn transition_to_open(&self, reason: &str) {
let mut inner = self.inner.write().unwrap();
let prev_state = inner.state;
inner.state = CircuitBreakerState::Open;
inner.opened_at = Some(Instant::now());
inner.consecutive_successes = 0;
let event = CircuitTransitionEvent {
from_state: prev_state,
to_state: CircuitBreakerState::Open,
timestamp: SystemTime::now(),
reason: reason.to_string(),
};
inner.metrics.state = CircuitBreakerState::Open;
inner.metrics.last_transition_timestamp = Some(SystemTime::now());
drop(inner);
info!(
"Circuit breaker '{}' transitioning to Open: {}",
self.name, reason
);
self.notify_state_change(&event);
}
fn transition_to_half_open(&self, reason: &str) {
let mut inner = self.inner.write().unwrap();
let prev_state = inner.state;
inner.state = CircuitBreakerState::HalfOpen;
inner.half_open_entered_at = Some(Instant::now());
inner.consecutive_successes = 0;
inner.half_open_concurrency_count = 0;
let event = CircuitTransitionEvent {
from_state: prev_state,
to_state: CircuitBreakerState::HalfOpen,
timestamp: SystemTime::now(),
reason: reason.to_string(),
};
inner.metrics.state = CircuitBreakerState::HalfOpen;
inner.metrics.last_transition_timestamp = Some(SystemTime::now());
drop(inner);
info!(
"Circuit breaker '{}' transitioning to HalfOpen: {}",
self.name, reason
);
self.notify_state_change(&event);
}
fn transition_to_closed(&self, reason: &str) {
let mut inner = self.inner.write().unwrap();
let prev_state = inner.state;
inner.state = CircuitBreakerState::Closed;
inner.opened_at = None;
inner.half_open_entered_at = None;
inner.consecutive_failures = 0;
let event = CircuitTransitionEvent {
from_state: prev_state,
to_state: CircuitBreakerState::Closed,
timestamp: SystemTime::now(),
reason: reason.to_string(),
};
inner.metrics.state = CircuitBreakerState::Closed;
inner.metrics.last_transition_timestamp = Some(SystemTime::now());
drop(inner);
info!(
"Circuit breaker '{}' transitioning to Closed: {}",
self.name, reason
);
self.notify_state_change(&event);
}
fn record_success(&self, duration: Duration) {
let mut inner = self.inner.write().unwrap();
inner.consecutive_successes += 1;
inner.consecutive_failures = 0;
if inner.results_window.len() >= self.config.sliding_window_size {
inner.results_window.pop_front();
}
inner.results_window.push_back(true);
let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
duration >= threshold
} else {
false
};
if inner.slow_call_window.len() >= self.config.sliding_window_size {
inner.slow_call_window.pop_front();
}
inner.slow_call_window.push_back(was_slow);
inner.metrics.total_requests += 1;
inner.metrics.successful_requests += 1;
inner.metrics.consecutive_successes = inner.consecutive_successes as u32;
inner.metrics.consecutive_failures = 0;
self.update_rates(&mut inner);
drop(inner);
self.notify_operation_result(CircuitOperationType::Success, duration, None);
}
fn record_failure(&self, error: &DecrustError, duration: Duration) {
let mut inner = self.inner.write().unwrap();
inner.consecutive_failures += 1;
inner.consecutive_successes = 0;
if inner.results_window.len() >= self.config.sliding_window_size {
inner.results_window.pop_front();
}
inner.results_window.push_back(false);
let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
duration >= threshold
} else {
false
};
if inner.slow_call_window.len() >= self.config.sliding_window_size {
inner.slow_call_window.pop_front();
}
inner.slow_call_window.push_back(was_slow);
inner.metrics.total_requests += 1;
inner.metrics.failed_requests += 1;
inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
inner.metrics.consecutive_successes = 0;
inner.metrics.last_error_timestamp = Some(SystemTime::now());
self.update_rates(&mut inner);
let error_clone = error.clone(); drop(inner);
self.notify_operation_result(CircuitOperationType::Failure, duration, Some(&error_clone));
}
fn record_rejected(&self) {
let mut inner = self.inner.write().unwrap();
inner.metrics.total_requests += 1;
inner.metrics.rejected_requests += 1;
drop(inner);
self.notify_operation_result(CircuitOperationType::Rejected, Duration::from_secs(0), None);
}
fn record_timeout(&self) {
let mut inner = self.inner.write().unwrap();
inner.consecutive_failures += 1;
inner.consecutive_successes = 0;
if inner.results_window.len() >= self.config.sliding_window_size {
inner.results_window.pop_front();
}
inner.results_window.push_back(false);
inner.metrics.total_requests += 1;
inner.metrics.timeout_requests += 1;
inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
inner.metrics.consecutive_successes = 0;
inner.metrics.last_error_timestamp = Some(SystemTime::now());
self.update_rates(&mut inner);
drop(inner);
let timeout_error = DecrustError::Timeout {
operation: format!("Operation in circuit breaker '{}'", self.name),
duration: self.config.operation_timeout.unwrap_or_default(),
backtrace: Backtrace::generate(),
};
self.notify_operation_result(
CircuitOperationType::Timeout,
self.config.operation_timeout.unwrap_or_default(),
Some(&timeout_error),
);
}
fn should_open_circuit(&self) -> bool {
let inner = self.inner.read().unwrap();
if inner.consecutive_failures >= self.config.failure_threshold {
return true;
}
if inner.results_window.len() >= self.config.minimum_request_threshold_for_rate {
let failure_count = inner
.results_window
.iter()
.filter(|&&success| !success)
.count();
let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
if failure_rate >= self.config.failure_rate_threshold {
return true;
}
}
if let (Some(threshold), true) = (
self.config.slow_call_rate_threshold,
!inner.slow_call_window.is_empty(),
) {
let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
if slow_rate >= threshold {
return true;
}
}
false
}
fn should_count_as_failure(&self, error: &DecrustError) -> bool {
if let Some(predicate) = &self.config.error_predicate {
return (predicate.as_ref())(error);
}
true
}
fn update_rates(&self, inner: &mut InnerState) {
if inner.results_window.is_empty() {
inner.metrics.failure_rate_in_window = None;
} else {
let failure_count = inner
.results_window
.iter()
.filter(|&&success| !success)
.count();
let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
inner.metrics.failure_rate_in_window = Some(failure_rate);
}
if inner.slow_call_window.is_empty() {
inner.metrics.slow_call_rate_in_window = None;
} else {
let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
inner.metrics.slow_call_rate_in_window = Some(slow_rate);
}
}
fn notify_state_change(&self, event: &CircuitTransitionEvent) {
let observers = self.observers.lock().unwrap();
for observer in &*observers {
observer.on_state_change(&self.name, event);
}
}
fn notify_operation_attempt(&self, state: CircuitBreakerState) {
let observers = self.observers.lock().unwrap();
for observer in &*observers {
observer.on_operation_attempt(&self.name, state);
}
}
fn notify_operation_result(
&self,
op_type: CircuitOperationType,
duration: Duration,
error: Option<&DecrustError>,
) {
let observers = self.observers.lock().unwrap();
for observer in &*observers {
observer.on_operation_result(&self.name, op_type, duration, error);
}
}
fn notify_reset(&self) {
let observers = self.observers.lock().unwrap();
for observer in &*observers {
observer.on_reset(&self.name);
}
}
}