1#![warn(missing_docs)]
3use super::backtrace::DecrustBacktrace as Backtrace;
26use super::{DecrustError, Result};
27use std::collections::VecDeque;
28use std::fmt;
29use std::sync::{Arc, Mutex, RwLock};
30use std::time::{Duration, Instant, SystemTime};
31use tracing::info;
32
33pub struct DebugIgnore<T: ?Sized>(pub T);
35
36impl<T: ?Sized> fmt::Debug for DebugIgnore<T> {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "<function>")
39 }
40}
41
42impl<T: Clone> Clone for DebugIgnore<T> {
43 fn clone(&self) -> Self {
44 DebugIgnore(self.0.clone())
45 }
46}
47
48#[cfg(feature = "rand")]
49#[allow(unused_imports)]
50use rand::Rng;
51#[cfg(feature = "tokio")]
52use tokio::time;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
56pub enum CircuitBreakerState {
57 #[default]
59 Closed,
60 Open,
62 HalfOpen,
64}
65
66impl fmt::Display for CircuitBreakerState {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 write!(f, "{:?}", self)
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
74pub enum CircuitOperationType {
75 Success,
77 Failure,
79 Rejected,
81 Timeout,
83}
84
85#[derive(Debug, Clone)]
87pub struct CircuitTransitionEvent {
88 pub from_state: CircuitBreakerState,
90 pub to_state: CircuitBreakerState,
92 pub timestamp: SystemTime,
94 pub reason: String,
96}
97
98pub trait CircuitBreakerObserver: Send + Sync {
103 fn on_state_change(&self, name: &str, event: &CircuitTransitionEvent);
105 fn on_operation_attempt(&self, name: &str, state: CircuitBreakerState);
107 fn on_operation_result(
109 &self,
110 name: &str,
111 op_type: CircuitOperationType,
112 duration: Duration,
113 error: Option<&DecrustError>,
114 );
115 fn on_reset(&self, name: &str);
117}
118
119#[derive(Debug, Clone, Default)]
121pub struct CircuitMetrics {
122 pub state: CircuitBreakerState,
124 pub total_requests: u64,
126 pub successful_requests: u64,
128 pub failed_requests: u64,
130 pub rejected_requests: u64,
132 pub timeout_requests: u64,
134 pub consecutive_failures: u32,
136 pub consecutive_successes: u32,
138 pub last_error_timestamp: Option<SystemTime>,
140 pub last_transition_timestamp: Option<SystemTime>,
142 pub failure_rate_in_window: Option<f64>,
144 pub slow_call_rate_in_window: Option<f64>,
146}
147
148pub type ErrorPredicate = Arc<dyn Fn(&DecrustError) -> bool + Send + Sync>;
150
151#[derive(Clone)]
155pub struct CircuitBreakerConfig {
156 pub failure_threshold: usize,
158 pub failure_rate_threshold: f64,
160 pub minimum_request_threshold_for_rate: usize,
162 pub success_threshold_to_close: usize,
164 pub reset_timeout: Duration,
166 pub half_open_max_concurrent_operations: usize,
168 pub operation_timeout: Option<Duration>,
170 pub sliding_window_size: usize,
172 pub error_predicate: Option<ErrorPredicate>,
175 pub metrics_window_size: usize, pub track_metrics: bool,
179 pub slow_call_duration_threshold: Option<Duration>,
181 pub slow_call_rate_threshold: Option<f64>,
183 pub circuit_breaker_threshold: u32,
185 pub circuit_breaker_cooldown: Duration,
187}
188
189impl Default for CircuitBreakerConfig {
190 fn default() -> Self {
191 Self {
192 failure_threshold: 5,
193 failure_rate_threshold: 0.5,
194 minimum_request_threshold_for_rate: 10,
195 success_threshold_to_close: 3,
196 reset_timeout: Duration::from_secs(30),
197 half_open_max_concurrent_operations: 1,
198 operation_timeout: Some(Duration::from_secs(5)),
199 sliding_window_size: 100,
200 error_predicate: None,
201 metrics_window_size: 100, track_metrics: true,
203 slow_call_duration_threshold: None, slow_call_rate_threshold: None, circuit_breaker_threshold: 3, circuit_breaker_cooldown: Duration::from_secs(60), }
208 }
209}
210
211impl fmt::Debug for CircuitBreakerConfig {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 f.debug_struct("CircuitBreakerConfig")
214 .field("failure_threshold", &self.failure_threshold)
215 .field("failure_rate_threshold", &self.failure_rate_threshold)
216 .field(
217 "minimum_request_threshold_for_rate",
218 &self.minimum_request_threshold_for_rate,
219 )
220 .field(
221 "success_threshold_to_close",
222 &self.success_threshold_to_close,
223 )
224 .field("reset_timeout", &self.reset_timeout)
225 .field(
226 "half_open_max_concurrent_operations",
227 &self.half_open_max_concurrent_operations,
228 )
229 .field("operation_timeout", &self.operation_timeout)
230 .field("sliding_window_size", &self.sliding_window_size)
231 .field(
232 "error_predicate",
233 &if self.error_predicate.is_some() {
234 "Some(<function>)"
235 } else {
236 "None"
237 },
238 )
239 .field("metrics_window_size", &self.metrics_window_size)
240 .field("track_metrics", &self.track_metrics)
241 .field(
242 "slow_call_duration_threshold",
243 &self.slow_call_duration_threshold,
244 )
245 .field("slow_call_rate_threshold", &self.slow_call_rate_threshold)
246 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
247 .field("circuit_breaker_cooldown", &self.circuit_breaker_cooldown)
248 .finish()
249 }
250}
251
252#[derive(Debug)] struct InnerState {
254 state: CircuitBreakerState,
255 opened_at: Option<Instant>,
256 half_open_entered_at: Option<Instant>,
257 consecutive_failures: usize,
258 consecutive_successes: usize,
259 half_open_concurrency_count: usize,
260 results_window: VecDeque<bool>, slow_call_window: VecDeque<bool>, metrics: CircuitMetrics,
263}
264
265impl Default for InnerState {
266 fn default() -> Self {
267 Self {
268 state: CircuitBreakerState::Closed,
269 opened_at: None,
270 half_open_entered_at: None,
271 consecutive_failures: 0,
272 consecutive_successes: 0,
273 half_open_concurrency_count: 0,
274 results_window: VecDeque::with_capacity(100),
275 slow_call_window: VecDeque::with_capacity(100),
276 metrics: CircuitMetrics::default(),
277 }
278 }
279}
280
281pub struct CircuitBreaker {
283 name: String,
284 config: CircuitBreakerConfig,
285 inner: RwLock<InnerState>,
286 observers: Mutex<Vec<Arc<dyn CircuitBreakerObserver>>>,
287}
288
289impl CircuitBreaker {
290 pub fn new(name: impl Into<String>, config: CircuitBreakerConfig) -> Arc<Self> {
292 Arc::new(Self {
293 name: name.into(),
294 config,
295 inner: RwLock::new(InnerState::default()),
296 observers: Mutex::new(Vec::new()),
297 })
298 }
299
300 pub fn add_observer(&self, observer: Arc<dyn CircuitBreakerObserver>) {
302 let mut observers = self.observers.lock().unwrap();
303 observers.push(observer);
304 }
305
306 pub fn state(&self) -> CircuitBreakerState {
308 let inner = self.inner.read().unwrap();
309 inner.state
310 }
311
312 pub fn metrics(&self) -> CircuitMetrics {
314 let inner = self.inner.read().unwrap();
315 inner.metrics.clone()
316 }
317
318 pub fn trip(&self) {
320 let mut inner = self.inner.write().unwrap();
321 let prev_state = inner.state;
322 inner.state = CircuitBreakerState::Open;
323 inner.opened_at = Some(Instant::now());
324 inner.consecutive_failures = self.config.failure_threshold;
325 inner.consecutive_successes = 0;
326
327 let event = CircuitTransitionEvent {
328 from_state: prev_state,
329 to_state: CircuitBreakerState::Open,
330 timestamp: SystemTime::now(),
331 reason: "Manual trip".to_string(),
332 };
333
334 inner.metrics.state = CircuitBreakerState::Open;
336 inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
337 inner.metrics.consecutive_successes = 0;
338 inner.metrics.last_transition_timestamp = Some(SystemTime::now());
339
340 drop(inner);
342
343 self.notify_state_change(&event);
345 }
346
347 pub fn reset(&self) {
349 let mut inner = self.inner.write().unwrap();
350 let prev_state = inner.state;
351 inner.state = CircuitBreakerState::Closed;
352 inner.opened_at = None;
353 inner.half_open_entered_at = None;
354 inner.consecutive_failures = 0;
355 inner.consecutive_successes = 0;
356 inner.half_open_concurrency_count = 0;
357
358 inner.metrics.state = CircuitBreakerState::Closed;
360 inner.metrics.consecutive_failures = 0;
361 inner.metrics.consecutive_successes = 0;
362 inner.metrics.last_transition_timestamp = Some(SystemTime::now());
363
364 inner.results_window.clear();
366 inner.slow_call_window.clear();
367
368 let event = CircuitTransitionEvent {
369 from_state: prev_state,
370 to_state: CircuitBreakerState::Closed,
371 timestamp: SystemTime::now(),
372 reason: "Manual reset".to_string(),
373 };
374
375 drop(inner);
377
378 self.notify_state_change(&event);
380 self.notify_reset();
381 }
382
383 #[cfg(not(feature = "std-thread"))]
385 pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
386 where
387 F: FnOnce() -> Result<Ret>,
388 {
389 let start_time = Instant::now();
390 let state = self.state();
391
392 self.notify_operation_attempt(state);
393
394 match state {
395 CircuitBreakerState::Open => {
396 let inner = self.inner.read().unwrap();
398 let should_transition = if let Some(opened_at) = inner.opened_at {
399 opened_at.elapsed() >= self.config.reset_timeout
400 } else {
401 false
402 };
403 drop(inner);
404
405 if should_transition {
406 self.transition_to_half_open("Reset timeout elapsed");
407 self.execute_half_open(operation, start_time)
409 } else {
410 self.record_rejected();
412 Err(DecrustError::CircuitBreakerOpen {
413 name: self.name.clone(),
414 retry_after: Some(
415 self.config
416 .reset_timeout
417 .checked_sub(
418 self.inner.read().unwrap().opened_at.unwrap().elapsed(),
419 )
420 .unwrap_or_default(),
421 ),
422 failure_count: None,
423 last_error: None,
424 backtrace: Backtrace::generate(),
425 })
426 }
427 }
428 CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
429 CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
430 }
431 }
432
433 #[cfg(feature = "std-thread")]
435 pub fn execute<F, Ret>(&self, operation: F) -> Result<Ret>
436 where
437 F: FnOnce() -> Result<Ret> + Send + 'static,
438 Ret: Send + 'static,
439 {
440 let start_time = Instant::now();
441 let state = self.state();
442
443 self.notify_operation_attempt(state);
444
445 match state {
446 CircuitBreakerState::Open => {
447 let inner = self.inner.read().unwrap();
449 let should_transition = if let Some(opened_at) = inner.opened_at {
450 opened_at.elapsed() >= self.config.reset_timeout
451 } else {
452 false
453 };
454 drop(inner);
455
456 if should_transition {
457 self.transition_to_half_open("Reset timeout elapsed");
458 self.execute_half_open(operation, start_time)
460 } else {
461 self.record_rejected();
463 Err(DecrustError::CircuitBreakerOpen {
464 name: self.name.clone(),
465 retry_after: Some(
466 self.config
467 .reset_timeout
468 .checked_sub(
469 self.inner.read().unwrap().opened_at.unwrap().elapsed(),
470 )
471 .unwrap_or_default(),
472 ),
473 failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
474 last_error: None, backtrace: Backtrace::generate(),
476 })
477 }
478 }
479 CircuitBreakerState::HalfOpen => self.execute_half_open(operation, start_time),
480 CircuitBreakerState::Closed => self.execute_closed(operation, start_time),
481 }
482 }
483
484 #[cfg(feature = "tokio")]
486 pub async fn execute_async<F, Fut, Ret>(&self, operation: F) -> Result<Ret>
487 where
488 F: FnOnce() -> Fut,
489 Fut: std::future::Future<Output = Result<Ret>>,
490 {
491 let start_time = Instant::now();
492 let state = self.state();
493
494 self.notify_operation_attempt(state);
495
496 match state {
497 CircuitBreakerState::Open => {
498 let inner = self.inner.read().unwrap();
500 let should_transition = if let Some(opened_at) = inner.opened_at {
501 opened_at.elapsed() >= self.config.reset_timeout
502 } else {
503 false
504 };
505 drop(inner);
506
507 if should_transition {
508 self.transition_to_half_open("Reset timeout elapsed");
509 self.execute_half_open_async(operation, start_time).await
511 } else {
512 self.record_rejected();
514 Err(DecrustError::CircuitBreakerOpen {
515 name: self.name.clone(),
516 retry_after: Some(
517 self.config
518 .reset_timeout
519 .checked_sub(
520 self.inner.read().unwrap().opened_at.unwrap().elapsed(),
521 )
522 .unwrap_or_default(),
523 ),
524 failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
525 last_error: None, backtrace: Backtrace::generate(),
527 })
528 }
529 }
530 CircuitBreakerState::HalfOpen => {
531 self.execute_half_open_async(operation, start_time).await
532 }
533 CircuitBreakerState::Closed => self.execute_closed_async(operation, start_time).await,
534 }
535 }
536
537 #[cfg(not(feature = "std-thread"))]
541 fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
542 where
543 F: FnOnce() -> Result<Ret>,
544 {
545 let result = if let Some(timeout) = self.config.operation_timeout {
546 self.execute_with_timeout(operation, timeout)
547 } else {
548 operation()
549 };
550
551 let duration = start_time.elapsed();
552
553 match &result {
554 Ok(_) => {
555 self.record_success(duration);
556 }
557 Err(e) => {
558 if self.should_count_as_failure(e) {
559 self.record_failure(e, duration);
560
561 if self.should_open_circuit() {
563 self.transition_to_open("Failure threshold reached");
564 }
565 } else {
566 self.record_success(duration);
568 }
569 }
570 }
571
572 result
573 }
574
575 #[cfg(feature = "std-thread")]
576 fn execute_closed<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
577 where
578 F: FnOnce() -> Result<Ret> + Send + 'static,
579 Ret: Send + 'static,
580 {
581 let result = if let Some(timeout) = self.config.operation_timeout {
582 self.execute_with_timeout(operation, timeout)
583 } else {
584 operation()
585 };
586
587 let duration = start_time.elapsed();
588
589 match &result {
590 Ok(_) => {
591 self.record_success(duration);
592 }
593 Err(e) => {
594 if self.should_count_as_failure(e) {
595 self.record_failure(e, duration);
596
597 if self.should_open_circuit() {
599 self.transition_to_open("Failure threshold reached");
600 }
601 } else {
602 self.record_success(duration);
604 }
605 }
606 }
607
608 result
609 }
610
611 #[cfg(not(feature = "std-thread"))]
613 fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
614 where
615 F: FnOnce() -> Result<Ret>,
616 {
617 {
619 let mut inner = self.inner.write().unwrap();
620 if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
621 {
622 self.record_rejected();
624 return Err(DecrustError::CircuitBreakerOpen {
625 name: self.name.clone(),
626 retry_after: Some(Duration::from_millis(100)),
627 failure_count: None,
628 last_error: None,
629 backtrace: Backtrace::generate(),
630 });
631 }
632
633 inner.half_open_concurrency_count += 1;
635 }
636
637 let result = if let Some(timeout) = self.config.operation_timeout {
639 self.execute_with_timeout(operation, timeout)
640 } else {
641 operation()
642 };
643
644 let duration = start_time.elapsed();
645
646 {
648 let mut inner = self.inner.write().unwrap();
649 inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
650 }
651
652 match &result {
653 Ok(_) => {
654 self.record_success(duration);
655
656 let close_circuit = {
658 let inner = self.inner.read().unwrap();
659 inner.consecutive_successes >= self.config.success_threshold_to_close
660 };
661
662 if close_circuit {
663 self.transition_to_closed("Success threshold reached");
664 }
665 }
666 Err(e) => {
667 if self.should_count_as_failure(e) {
668 self.record_failure(e, duration);
669
670 self.transition_to_open("Failure in half-open state");
672 } else {
673 self.record_success(duration);
675 }
676 }
677 }
678
679 result
680 }
681
682 #[cfg(feature = "std-thread")]
683 fn execute_half_open<F, Ret>(&self, operation: F, start_time: Instant) -> Result<Ret>
684 where
685 F: FnOnce() -> Result<Ret> + Send + 'static,
686 Ret: Send + 'static,
687 {
688 {
690 let mut inner = self.inner.write().unwrap();
691 if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
692 {
693 self.record_rejected();
695 return Err(DecrustError::CircuitBreakerOpen {
696 name: self.name.clone(),
697 retry_after: Some(Duration::from_millis(100)),
698 failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
699 last_error: None, backtrace: Backtrace::generate(),
701 });
702 }
703
704 inner.half_open_concurrency_count += 1;
706 }
707
708 let result = if let Some(timeout) = self.config.operation_timeout {
710 self.execute_with_timeout(operation, timeout)
711 } else {
712 operation()
713 };
714
715 let duration = start_time.elapsed();
716
717 {
719 let mut inner = self.inner.write().unwrap();
720 inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
721 }
722
723 match &result {
724 Ok(_) => {
725 self.record_success(duration);
726
727 let close_circuit = {
729 let inner = self.inner.read().unwrap();
730 inner.consecutive_successes >= self.config.success_threshold_to_close
731 };
732
733 if close_circuit {
734 self.transition_to_closed("Success threshold reached");
735 }
736 }
737 Err(e) => {
738 if self.should_count_as_failure(e) {
739 self.record_failure(e, duration);
740
741 self.transition_to_open("Failure in half-open state");
743 } else {
744 self.record_success(duration);
746 }
747 }
748 }
749
750 result
751 }
752
753 #[cfg(feature = "tokio")]
756 async fn execute_closed_async<F, Fut, Ret>(
757 &self,
758 operation: F,
759 start_time: Instant,
760 ) -> Result<Ret>
761 where
762 F: FnOnce() -> Fut,
763 Fut: std::future::Future<Output = Result<Ret>>,
764 {
765 let result = if let Some(timeout) = self.config.operation_timeout {
766 self.execute_with_timeout_async(operation, timeout).await
767 } else {
768 operation().await
769 };
770
771 let duration = start_time.elapsed();
772
773 match &result {
774 Ok(_) => {
775 self.record_success(duration);
776 }
777 Err(e) => {
778 if self.should_count_as_failure(e) {
779 self.record_failure(e, duration);
780
781 if self.should_open_circuit() {
783 self.transition_to_open("Failure threshold reached");
784 }
785 } else {
786 self.record_success(duration);
788 }
789 }
790 }
791
792 result
793 }
794
795 #[cfg(feature = "tokio")]
796 async fn execute_half_open_async<F, Fut, Ret>(
797 &self,
798 operation: F,
799 start_time: Instant,
800 ) -> Result<Ret>
801 where
802 F: FnOnce() -> Fut,
803 Fut: std::future::Future<Output = Result<Ret>>,
804 {
805 {
807 let mut inner = self.inner.write().unwrap();
808 if inner.half_open_concurrency_count >= self.config.half_open_max_concurrent_operations
809 {
810 self.record_rejected();
812 return Err(DecrustError::CircuitBreakerOpen {
813 name: self.name.clone(),
814 retry_after: Some(Duration::from_millis(100)),
815 failure_count: Some(self.inner.read().unwrap().consecutive_failures as u32),
816 last_error: None, backtrace: Backtrace::generate(),
818 });
819 }
820
821 inner.half_open_concurrency_count += 1;
823 }
824
825 let result = if let Some(timeout) = self.config.operation_timeout {
827 self.execute_with_timeout_async(operation, timeout).await
828 } else {
829 operation().await
830 };
831
832 let duration = start_time.elapsed();
833
834 {
836 let mut inner = self.inner.write().unwrap();
837 inner.half_open_concurrency_count = inner.half_open_concurrency_count.saturating_sub(1);
838 }
839
840 match &result {
841 Ok(_) => {
842 self.record_success(duration);
843
844 let close_circuit = {
846 let inner = self.inner.read().unwrap();
847 inner.consecutive_successes >= self.config.success_threshold_to_close
848 };
849
850 if close_circuit {
851 self.transition_to_closed("Success threshold reached");
852 }
853 }
854 Err(e) => {
855 if self.should_count_as_failure(e) {
856 self.record_failure(e, duration);
857
858 self.transition_to_open("Failure in half-open state");
860 } else {
861 self.record_success(duration);
863 }
864 }
865 }
866
867 result
868 }
869
870 #[cfg(not(feature = "std-thread"))]
874 fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
875 where
876 F: FnOnce() -> Result<Ret>,
877 {
878 let start = Instant::now();
880 let result = operation();
881 if start.elapsed() > timeout {
882 self.record_timeout();
883 Err(DecrustError::Timeout {
884 operation: format!("Operation in circuit breaker '{}'", self.name),
885 duration: timeout,
886 backtrace: Backtrace::generate(),
887 })
888 } else {
889 result
890 }
891 }
892
893 #[cfg(feature = "std-thread")]
895 fn execute_with_timeout<F, Ret>(&self, operation: F, timeout: Duration) -> Result<Ret>
896 where
897 F: FnOnce() -> Result<Ret> + Send + 'static,
898 Ret: Send + 'static,
899 {
900 use std::sync::mpsc;
901 use std::thread;
902
903 let (tx, rx) = mpsc::channel();
904
905 let handle = thread::spawn(move || {
906 let result = operation();
907 let _ = tx.send(result);
908 });
909
910 match rx.recv_timeout(timeout) {
911 Ok(result) => {
912 let _ = handle.join();
914 result
915 }
916 Err(_) => {
917 self.record_timeout();
919 Err(DecrustError::Timeout {
920 operation: format!("Operation in circuit breaker '{}'", self.name),
921 duration: timeout,
922 backtrace: Backtrace::generate(),
923 })
924 }
925 }
926 }
927
928 #[cfg(feature = "tokio")]
929 async fn execute_with_timeout_async<F, Fut, Ret>(
930 &self,
931 operation: F,
932 timeout: Duration,
933 ) -> Result<Ret>
934 where
935 F: FnOnce() -> Fut,
936 Fut: std::future::Future<Output = Result<Ret>>,
937 {
938 match time::timeout(timeout, operation()).await {
939 Ok(result) => result,
940 Err(_) => {
941 self.record_timeout();
942 Err(DecrustError::Timeout {
943 operation: format!("Operation in circuit breaker '{}'", self.name),
944 duration: timeout,
945 backtrace: Backtrace::generate(),
946 })
947 }
948 }
949 }
950
951 fn transition_to_open(&self, reason: &str) {
954 let mut inner = self.inner.write().unwrap();
955 let prev_state = inner.state;
956 inner.state = CircuitBreakerState::Open;
957 inner.opened_at = Some(Instant::now());
958 inner.consecutive_successes = 0;
959
960 let event = CircuitTransitionEvent {
961 from_state: prev_state,
962 to_state: CircuitBreakerState::Open,
963 timestamp: SystemTime::now(),
964 reason: reason.to_string(),
965 };
966
967 inner.metrics.state = CircuitBreakerState::Open;
969 inner.metrics.last_transition_timestamp = Some(SystemTime::now());
970
971 drop(inner);
973
974 info!(
975 "Circuit breaker '{}' transitioning to Open: {}",
976 self.name, reason
977 );
978 self.notify_state_change(&event);
979 }
980
981 fn transition_to_half_open(&self, reason: &str) {
982 let mut inner = self.inner.write().unwrap();
983 let prev_state = inner.state;
984 inner.state = CircuitBreakerState::HalfOpen;
985 inner.half_open_entered_at = Some(Instant::now());
986 inner.consecutive_successes = 0;
987 inner.half_open_concurrency_count = 0;
988
989 let event = CircuitTransitionEvent {
990 from_state: prev_state,
991 to_state: CircuitBreakerState::HalfOpen,
992 timestamp: SystemTime::now(),
993 reason: reason.to_string(),
994 };
995
996 inner.metrics.state = CircuitBreakerState::HalfOpen;
998 inner.metrics.last_transition_timestamp = Some(SystemTime::now());
999
1000 drop(inner);
1002
1003 info!(
1004 "Circuit breaker '{}' transitioning to HalfOpen: {}",
1005 self.name, reason
1006 );
1007 self.notify_state_change(&event);
1008 }
1009
1010 fn transition_to_closed(&self, reason: &str) {
1011 let mut inner = self.inner.write().unwrap();
1012 let prev_state = inner.state;
1013 inner.state = CircuitBreakerState::Closed;
1014 inner.opened_at = None;
1015 inner.half_open_entered_at = None;
1016 inner.consecutive_failures = 0;
1017
1018 let event = CircuitTransitionEvent {
1019 from_state: prev_state,
1020 to_state: CircuitBreakerState::Closed,
1021 timestamp: SystemTime::now(),
1022 reason: reason.to_string(),
1023 };
1024
1025 inner.metrics.state = CircuitBreakerState::Closed;
1027 inner.metrics.last_transition_timestamp = Some(SystemTime::now());
1028
1029 drop(inner);
1031
1032 info!(
1033 "Circuit breaker '{}' transitioning to Closed: {}",
1034 self.name, reason
1035 );
1036 self.notify_state_change(&event);
1037 }
1038
1039 fn record_success(&self, duration: Duration) {
1042 let mut inner = self.inner.write().unwrap();
1043 inner.consecutive_successes += 1;
1044 inner.consecutive_failures = 0;
1045
1046 if inner.results_window.len() >= self.config.sliding_window_size {
1048 inner.results_window.pop_front();
1049 }
1050 inner.results_window.push_back(true);
1051
1052 let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
1054 duration >= threshold
1055 } else {
1056 false
1057 };
1058
1059 if inner.slow_call_window.len() >= self.config.sliding_window_size {
1061 inner.slow_call_window.pop_front();
1062 }
1063 inner.slow_call_window.push_back(was_slow);
1064
1065 inner.metrics.total_requests += 1;
1067 inner.metrics.successful_requests += 1;
1068 inner.metrics.consecutive_successes = inner.consecutive_successes as u32;
1069 inner.metrics.consecutive_failures = 0;
1070
1071 self.update_rates(&mut inner);
1073
1074 drop(inner);
1075
1076 self.notify_operation_result(CircuitOperationType::Success, duration, None);
1077 }
1078
1079 fn record_failure(&self, error: &DecrustError, duration: Duration) {
1080 let mut inner = self.inner.write().unwrap();
1081 inner.consecutive_failures += 1;
1082 inner.consecutive_successes = 0;
1083
1084 if inner.results_window.len() >= self.config.sliding_window_size {
1086 inner.results_window.pop_front();
1087 }
1088 inner.results_window.push_back(false);
1089
1090 let was_slow = if let Some(threshold) = self.config.slow_call_duration_threshold {
1092 duration >= threshold
1093 } else {
1094 false
1095 };
1096
1097 if inner.slow_call_window.len() >= self.config.sliding_window_size {
1099 inner.slow_call_window.pop_front();
1100 }
1101 inner.slow_call_window.push_back(was_slow);
1102
1103 inner.metrics.total_requests += 1;
1105 inner.metrics.failed_requests += 1;
1106 inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
1107 inner.metrics.consecutive_successes = 0;
1108 inner.metrics.last_error_timestamp = Some(SystemTime::now());
1109
1110 self.update_rates(&mut inner);
1112
1113 let error_clone = error.clone(); drop(inner);
1115
1116 self.notify_operation_result(CircuitOperationType::Failure, duration, Some(&error_clone));
1117 }
1118
1119 fn record_rejected(&self) {
1120 let mut inner = self.inner.write().unwrap();
1121 inner.metrics.total_requests += 1;
1122 inner.metrics.rejected_requests += 1;
1123 drop(inner);
1124
1125 self.notify_operation_result(CircuitOperationType::Rejected, Duration::from_secs(0), None);
1127 }
1128
1129 fn record_timeout(&self) {
1130 let mut inner = self.inner.write().unwrap();
1131 inner.consecutive_failures += 1;
1132 inner.consecutive_successes = 0;
1133
1134 if inner.results_window.len() >= self.config.sliding_window_size {
1136 inner.results_window.pop_front();
1137 }
1138 inner.results_window.push_back(false);
1139
1140 inner.metrics.total_requests += 1;
1142 inner.metrics.timeout_requests += 1;
1143 inner.metrics.consecutive_failures = inner.consecutive_failures as u32;
1144 inner.metrics.consecutive_successes = 0;
1145 inner.metrics.last_error_timestamp = Some(SystemTime::now());
1146
1147 self.update_rates(&mut inner);
1149
1150 drop(inner);
1151
1152 let timeout_error = DecrustError::Timeout {
1153 operation: format!("Operation in circuit breaker '{}'", self.name),
1154 duration: self.config.operation_timeout.unwrap_or_default(),
1155 backtrace: Backtrace::generate(),
1156 };
1157
1158 self.notify_operation_result(
1159 CircuitOperationType::Timeout,
1160 self.config.operation_timeout.unwrap_or_default(),
1161 Some(&timeout_error),
1162 );
1163 }
1164
1165 fn should_open_circuit(&self) -> bool {
1168 let inner = self.inner.read().unwrap();
1169
1170 if inner.consecutive_failures >= self.config.failure_threshold {
1172 return true;
1173 }
1174
1175 if inner.results_window.len() >= self.config.minimum_request_threshold_for_rate {
1177 let failure_count = inner
1178 .results_window
1179 .iter()
1180 .filter(|&&success| !success)
1181 .count();
1182 let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
1183
1184 if failure_rate >= self.config.failure_rate_threshold {
1185 return true;
1186 }
1187 }
1188
1189 if let (Some(threshold), true) = (
1191 self.config.slow_call_rate_threshold,
1192 !inner.slow_call_window.is_empty(),
1193 ) {
1194 let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
1195 let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
1196
1197 if slow_rate >= threshold {
1198 return true;
1199 }
1200 }
1201
1202 false
1203 }
1204
1205 fn should_count_as_failure(&self, error: &DecrustError) -> bool {
1206 if let Some(predicate) = &self.config.error_predicate {
1208 return (predicate.as_ref())(error);
1209 }
1210
1211 true
1213 }
1214
1215 fn update_rates(&self, inner: &mut InnerState) {
1216 if inner.results_window.is_empty() {
1217 inner.metrics.failure_rate_in_window = None;
1218 } else {
1219 let failure_count = inner
1220 .results_window
1221 .iter()
1222 .filter(|&&success| !success)
1223 .count();
1224 let failure_rate = failure_count as f64 / inner.results_window.len() as f64;
1225 inner.metrics.failure_rate_in_window = Some(failure_rate);
1226 }
1227
1228 if inner.slow_call_window.is_empty() {
1229 inner.metrics.slow_call_rate_in_window = None;
1230 } else {
1231 let slow_count = inner.slow_call_window.iter().filter(|&&slow| slow).count();
1232 let slow_rate = slow_count as f64 / inner.slow_call_window.len() as f64;
1233 inner.metrics.slow_call_rate_in_window = Some(slow_rate);
1234 }
1235 }
1236
1237 fn notify_state_change(&self, event: &CircuitTransitionEvent) {
1240 let observers = self.observers.lock().unwrap();
1241 for observer in &*observers {
1242 observer.on_state_change(&self.name, event);
1243 }
1244 }
1245
1246 fn notify_operation_attempt(&self, state: CircuitBreakerState) {
1247 let observers = self.observers.lock().unwrap();
1248 for observer in &*observers {
1249 observer.on_operation_attempt(&self.name, state);
1250 }
1251 }
1252
1253 fn notify_operation_result(
1254 &self,
1255 op_type: CircuitOperationType,
1256 duration: Duration,
1257 error: Option<&DecrustError>,
1258 ) {
1259 let observers = self.observers.lock().unwrap();
1260 for observer in &*observers {
1261 observer.on_operation_result(&self.name, op_type, duration, error);
1262 }
1263 }
1264 fn notify_reset(&self) {
1265 let observers = self.observers.lock().unwrap();
1266 for observer in &*observers {
1267 observer.on_reset(&self.name);
1268 }
1269 }
1270}