tower_circuitbreaker/
lib.rs

1//! Circuit breaker pattern for Tower services.
2//!
3//! A circuit breaker prevents cascading failures by monitoring service calls and
4//! temporarily blocking requests when the failure rate exceeds a threshold.
5//!
6//! ## States
7//! - **Closed**: Normal operation, all requests pass through
8//! - **Open**: Circuit is tripped, requests are rejected immediately
9//! - **Half-Open**: Testing if service has recovered, limited requests allowed
10//!
11//! ## Basic Example
12//!
13//! ```rust
14//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
15//! use tower::service_fn;
16//! use std::time::Duration;
17//!
18//! # async fn example() {
19//! let layer = CircuitBreakerConfig::<String, ()>::builder()
20//!     .failure_rate_threshold(0.5)  // Open at 50% failure rate
21//!     .sliding_window_size(100)     // Track last 100 calls
22//!     .wait_duration_in_open(Duration::from_secs(30))
23//!     .build();
24//!
25//! let svc = service_fn(|req: String| async move {
26//!     Ok::<String, ()>(req)
27//! });
28//! let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
29//! # }
30//! ```
31//!
32//! ## Time-Based Sliding Window
33//!
34//! Use time-based windows instead of count-based:
35//!
36//! ```rust
37//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker, SlidingWindowType};
38//! use tower::service_fn;
39//! use std::time::Duration;
40//!
41//! # async fn example() {
42//! let layer = CircuitBreakerConfig::<String, ()>::builder()
43//!     .failure_rate_threshold(0.5)
44//!     .sliding_window_type(SlidingWindowType::TimeBased)
45//!     .sliding_window_duration(Duration::from_secs(60))  // Last 60 seconds
46//!     .minimum_number_of_calls(10)
47//!     .build();
48//!
49//! let svc = service_fn(|req: String| async move {
50//!     Ok::<String, ()>(req)
51//! });
52//! let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
53//! # }
54//! ```
55//!
56//! ## Fallback Handler
57//!
58//! Provide fallback responses when circuit is open:
59//!
60//! ```rust
61//! use tower_circuitbreaker::CircuitBreakerConfig;
62//! use tower::service_fn;
63//! use std::time::Duration;
64//! use futures::future::BoxFuture;
65//!
66//! # async fn example() {
67//! let layer = CircuitBreakerConfig::<String, ()>::builder()
68//!     .failure_rate_threshold(0.5)
69//!     .sliding_window_size(100)
70//!     .build();
71//!
72//! let base_service = service_fn(|req: String| async move {
73//!     Ok::<String, ()>(req)
74//! });
75//!
76//! let mut service = layer.layer(base_service)
77//!     .with_fallback(|_req: String| -> BoxFuture<'static, Result<String, ()>> {
78//!         Box::pin(async {
79//!             Ok("fallback response".to_string())
80//!         })
81//!     });
82//! # }
83//! ```
84//!
85//! ## Custom Failure Classification
86//!
87//! Control what counts as a failure:
88//!
89//! ```rust
90//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
91//! use tower::service_fn;
92//! use std::time::Duration;
93//!
94//! # async fn example() {
95//! let layer = CircuitBreakerConfig::<String, std::io::Error>::builder()
96//!     .failure_rate_threshold(0.5)
97//!     .sliding_window_size(100)
98//!     .failure_classifier(|result: &Result<String, std::io::Error>| {
99//!         match result {
100//!             // Don't count timeouts as failures
101//!             Err(e) if e.kind() == std::io::ErrorKind::TimedOut => false,
102//!             Err(_) => true,
103//!             Ok(_) => false,
104//!         }
105//!     })
106//!     .build();
107//!
108//! let svc = service_fn(|req: String| async move {
109//!     Ok::<String, std::io::Error>(req)
110//! });
111//! let mut service: CircuitBreaker<_, String, String, std::io::Error> = layer.layer(svc);
112//! # }
113//! ```
114//!
115//! ## Slow Call Detection
116//!
117//! Open circuit based on slow calls:
118//!
119//! ```rust
120//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
121//! use tower::service_fn;
122//! use std::time::Duration;
123//!
124//! # async fn example() {
125//! let layer = CircuitBreakerConfig::<String, ()>::builder()
126//!     .failure_rate_threshold(1.0)  // Don't open on failures
127//!     .slow_call_duration_threshold(Duration::from_secs(2))
128//!     .slow_call_rate_threshold(0.5)  // Open at 50% slow calls
129//!     .sliding_window_size(100)
130//!     .build();
131//!
132//! let svc = service_fn(|req: String| async move {
133//!     Ok::<String, ()>(req)
134//! });
135//! let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
136//! # }
137//! ```
138//!
139//! ## Event Listeners
140//!
141//! Monitor circuit breaker behavior:
142//!
143//! ```rust
144//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
145//! use tower::service_fn;
146//! use std::time::Duration;
147//!
148//! # async fn example() {
149//! let layer = CircuitBreakerConfig::<String, ()>::builder()
150//!     .failure_rate_threshold(0.5)
151//!     .sliding_window_size(100)
152//!     .on_state_transition(|from, to| {
153//!         println!("Circuit breaker: {:?} -> {:?}", from, to);
154//!     })
155//!     .on_call_permitted(|state| {
156//!         println!("Call permitted in state: {:?}", state);
157//!     })
158//!     .on_call_rejected(|| {
159//!         println!("Call rejected - circuit open");
160//!     })
161//!     .on_slow_call(|duration| {
162//!         println!("Slow call detected: {:?}", duration);
163//!     })
164//!     .build();
165//!
166//! let svc = service_fn(|req: String| async move {
167//!     Ok::<String, ()>(req)
168//! });
169//! let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
170//! # }
171//! ```
172//!
173//! ## Error Handling
174//!
175//! ```rust
176//! use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreakerError};
177//! use tower::{Service, service_fn};
178//!
179//! # async fn example() {
180//! let layer = CircuitBreakerConfig::<String, ()>::builder().build();
181//! let mut service = layer.layer(service_fn(|req: String| async move {
182//!     Ok::<_, ()>(req)
183//! }));
184//!
185//! match service.call("request".to_string()).await {
186//!     Ok(response) => println!("Success: {}", response),
187//!     Err(CircuitBreakerError::OpenCircuit) => {
188//!         eprintln!("Circuit breaker is open");
189//!     }
190//!     Err(CircuitBreakerError::Inner(e)) => {
191//!         eprintln!("Service error: {:?}", e);
192//!     }
193//! }
194//! # }
195//! ```
196//!
197//! ## Features
198//! - Count-based and time-based sliding windows
199//! - Configurable failure rate threshold
200//! - Slow call detection and rate threshold
201//! - Half-open state for gradual recovery
202//! - Event system for observability
203//! - Optional fallback handling
204//! - Manual state control (force_open, force_closed, reset)
205//! - Sync state inspection with `state_sync()`
206//! - Metrics integration via `metrics` feature
207//! - Tracing support via `tracing` feature
208//!
209//! ## Feature Flags
210//! - `metrics`: enables metrics collection using the `metrics` crate
211//! - `tracing`: enables logging and tracing using the `tracing` crate
212
213use crate::circuit::Circuit;
214use futures::future::BoxFuture;
215#[cfg(feature = "metrics")]
216use metrics::{counter, describe_counter, describe_gauge};
217use std::sync::Arc;
218#[cfg(feature = "metrics")]
219use std::sync::Once;
220use std::task::{Context, Poll};
221use tokio::sync::Mutex;
222use tower::Service;
223#[cfg(feature = "tracing")]
224use tracing::debug;
225
226pub use circuit::CircuitState;
227pub use config::{CircuitBreakerConfig, CircuitBreakerConfigBuilder, SlidingWindowType};
228pub use error::CircuitBreakerError;
229pub use events::CircuitBreakerEvent;
230pub use layer::CircuitBreakerLayer;
231
232mod circuit;
233mod config;
234mod error;
235mod events;
236mod layer;
237
238pub(crate) type FailureClassifier<Res, Err> = dyn Fn(&Result<Res, Err>) -> bool + Send + Sync;
239pub(crate) type SharedFailureClassifier<Res, Err> = Arc<FailureClassifier<Res, Err>>;
240
241pub(crate) type FallbackFn<Req, Res, Err> =
242    dyn Fn(Req) -> BoxFuture<'static, Result<Res, Err>> + Send + Sync;
243pub(crate) type SharedFallback<Req, Res, Err> = Arc<FallbackFn<Req, Res, Err>>;
244
245#[cfg(feature = "metrics")]
246static METRICS_INIT: Once = Once::new();
247
248/// Returns a new builder for a `CircuitBreakerLayer`.
249///
250/// This is a convenience function that returns a builder. You can also use
251/// `CircuitBreakerConfig::builder()` directly.
252pub fn circuit_breaker_builder<Res, Err>() -> CircuitBreakerConfigBuilder<Res, Err> {
253    #[cfg(feature = "metrics")]
254    {
255        METRICS_INIT.call_once(|| {
256            describe_counter!(
257                "circuitbreaker_calls_total",
258                "Total number of calls through the circuit breaker"
259            );
260            describe_counter!(
261                "circuitbreaker_transitions_total",
262                "Total number of circuit breaker state transitions"
263            );
264            describe_gauge!(
265                "circuitbreaker_state",
266                "Current state of the circuit breaker"
267            );
268        });
269    }
270    CircuitBreakerConfigBuilder::default()
271}
272
273/// A Tower Service that applies circuit breaker logic to an inner service.
274///
275/// Manages the circuit state and controls calls to the inner service accordingly.
276pub struct CircuitBreaker<S, Req, Res, Err> {
277    inner: S,
278    circuit: Arc<Mutex<Circuit>>,
279    state_atomic: Arc<std::sync::atomic::AtomicU8>,
280    config: Arc<CircuitBreakerConfig<Res, Err>>,
281    fallback: Option<SharedFallback<Req, Res, Err>>,
282    _phantom: std::marker::PhantomData<Req>,
283}
284
285impl<S, Req, Res, Err> CircuitBreaker<S, Req, Res, Err> {
286    /// Creates a new `CircuitBreaker` wrapping the given service and configuration.
287    pub(crate) fn new(inner: S, config: Arc<CircuitBreakerConfig<Res, Err>>) -> Self {
288        let state_atomic = Arc::new(std::sync::atomic::AtomicU8::new(CircuitState::Closed as u8));
289        Self {
290            inner,
291            circuit: Arc::new(Mutex::new(Circuit::new_with_atomic(Arc::clone(
292                &state_atomic,
293            )))),
294            state_atomic,
295            config,
296            fallback: None,
297            _phantom: std::marker::PhantomData,
298        }
299    }
300
301    /// Sets a fallback function to be called when the circuit is open.
302    pub fn with_fallback<F>(mut self, fallback: F) -> Self
303    where
304        F: Fn(Req) -> BoxFuture<'static, Result<Res, Err>> + Send + Sync + 'static,
305    {
306        self.fallback = Some(Arc::new(fallback));
307        self
308    }
309
310    /// Forces the circuit into the open state.
311    pub async fn force_open(&self) {
312        let mut circuit = self.circuit.lock().await;
313        circuit.force_open(&self.config);
314    }
315
316    /// Forces the circuit into the closed state.
317    pub async fn force_closed(&self) {
318        let mut circuit = self.circuit.lock().await;
319        circuit.force_closed(&self.config);
320    }
321
322    /// Resets the circuit to the closed state and clears counts.
323    pub async fn reset(&self) {
324        let mut circuit = self.circuit.lock().await;
325        circuit.reset(&self.config);
326    }
327
328    /// Returns the current state of the circuit.
329    pub async fn state(&self) -> CircuitState {
330        let circuit = self.circuit.lock().await;
331        circuit.state()
332    }
333
334    /// Returns the current state of the circuit without requiring async context.
335    ///
336    /// This is safe to call from sync code (e.g., metrics collection, health checks).
337    /// Reads from an AtomicU8 that's kept synchronized with the actual state.
338    pub fn state_sync(&self) -> CircuitState {
339        CircuitState::from_u8(self.state_atomic.load(std::sync::atomic::Ordering::Acquire))
340    }
341}
342
343impl<S, Req, Res, Err> Service<Req> for CircuitBreaker<S, Req, Res, Err>
344where
345    S: Service<Req, Response = Res, Error = Err> + Clone + Send + 'static,
346    S::Future: Send + 'static,
347    Res: Send + 'static,
348    Err: Send + 'static,
349    Req: Send + 'static,
350{
351    type Response = Res;
352    type Error = CircuitBreakerError<Err>;
353    type Future = BoxFuture<'static, Result<Res, Self::Error>>;
354
355    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
356        self.inner
357            .poll_ready(cx)
358            .map_err(CircuitBreakerError::Inner)
359    }
360
361    fn call(&mut self, req: Req) -> Self::Future {
362        let config = Arc::clone(&self.config);
363        let circuit = Arc::clone(&self.circuit);
364        let mut inner = self.inner.clone();
365        let fallback = self.fallback.clone();
366
367        Box::pin(async move {
368            #[cfg(feature = "tracing")]
369            {
370                let cb_name = &config.name;
371                debug!(
372                    breaker = cb_name,
373                    "Checking if call is permitted by circuit breaker"
374                );
375            }
376
377            #[cfg(feature = "tracing")]
378            let circuit_check_span = {
379                use tracing::{Level, span};
380                let state = {
381                    // To avoid holding the lock too long, just get the state for span field.
382                    let circuit = circuit.lock().await;
383                    circuit.state()
384                };
385                let cb_name = &config.name;
386                span!(Level::DEBUG, "circuit_check", breaker = cb_name, state = ?state)
387            };
388            #[cfg(feature = "tracing")]
389            let _enter = circuit_check_span.enter();
390
391            let permitted = {
392                let mut circuit = circuit.lock().await;
393                circuit.try_acquire(&config)
394            };
395
396            #[cfg(feature = "tracing")]
397            {
398                let cb_name = &config.name;
399                if permitted {
400                    tracing::trace!(breaker = cb_name, "circuit breaker permitted call");
401                } else {
402                    tracing::trace!(
403                        breaker = cb_name,
404                        "circuit breaker rejected call (circuit open)"
405                    );
406                }
407            }
408
409            if !permitted {
410                #[cfg(feature = "metrics")]
411                {
412                    let counter = counter!("circuitbreaker_calls_total", "outcome" => "rejected");
413                    counter.increment(1);
414                }
415
416                // If a fallback is configured, call it instead of returning an error
417                if let Some(fallback_fn) = fallback {
418                    #[cfg(feature = "tracing")]
419                    {
420                        let cb_name = &config.name;
421                        tracing::debug!(breaker = cb_name, "Calling fallback handler");
422                    }
423
424                    return fallback_fn(req).await.map_err(CircuitBreakerError::Inner);
425                }
426
427                return Err(CircuitBreakerError::OpenCircuit);
428            }
429
430            let start = std::time::Instant::now();
431            let result = inner.call(req).await;
432            let duration = start.elapsed();
433
434            let mut circuit = circuit.lock().await;
435            if (config.failure_classifier)(&result) {
436                circuit.record_failure(&config, duration);
437            } else {
438                circuit.record_success(&config, duration);
439            }
440
441            result.map_err(CircuitBreakerError::Inner)
442        })
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use std::time::Duration;
450
451    fn dummy_config() -> CircuitBreakerConfig<(), ()> {
452        use tower_resilience_core::EventListeners;
453        CircuitBreakerConfig {
454            failure_rate_threshold: 0.5,
455            sliding_window_type: crate::config::SlidingWindowType::CountBased,
456            sliding_window_size: 10,
457            sliding_window_duration: None,
458            wait_duration_in_open: Duration::from_secs(1),
459            permitted_calls_in_half_open: 1,
460            failure_classifier: Arc::new(|r| r.is_err()),
461            minimum_number_of_calls: 10,
462            slow_call_duration_threshold: None,
463            slow_call_rate_threshold: 1.0,
464            event_listeners: EventListeners::new(),
465            name: "test".into(),
466        }
467    }
468
469    #[test]
470    fn transitions_to_open_on_high_failure_rate() {
471        let mut circuit = Circuit::new();
472        let config = dummy_config();
473
474        for _ in 0..6 {
475            circuit.record_failure(&config, Duration::from_millis(10));
476        }
477        for _ in 0..4 {
478            circuit.record_success(&config, Duration::from_millis(10));
479        }
480
481        assert_eq!(circuit.state(), CircuitState::Open);
482    }
483
484    #[test]
485    fn stays_closed_on_low_failure_rate() {
486        let mut circuit = Circuit::new();
487        let config = dummy_config();
488
489        for _ in 0..2 {
490            circuit.record_failure(&config, Duration::from_millis(10));
491        }
492        for _ in 0..8 {
493            circuit.record_success(&config, Duration::from_millis(10));
494        }
495
496        assert_eq!(circuit.state(), CircuitState::Closed);
497    }
498
499    #[tokio::test]
500    async fn manual_override_controls_work() {
501        let config = Arc::new(dummy_config());
502        let breaker: CircuitBreaker<(), (), (), ()> = CircuitBreaker::new((), config);
503
504        breaker.force_open().await;
505        assert_eq!(breaker.state().await, CircuitState::Open);
506
507        breaker.force_closed().await;
508        assert_eq!(breaker.state().await, CircuitState::Closed);
509    }
510
511    #[test]
512    fn test_error_helpers() {
513        let err: CircuitBreakerError<&str> = CircuitBreakerError::OpenCircuit;
514        assert!(err.is_circuit_open());
515        assert_eq!(err.into_inner(), None);
516
517        let err2 = CircuitBreakerError::Inner("fail");
518        assert!(!err2.is_circuit_open());
519        assert_eq!(err2.into_inner(), Some("fail"));
520    }
521
522    #[test]
523    fn test_event_listeners() {
524        use std::sync::atomic::{AtomicUsize, Ordering};
525        use tower_resilience_core::EventListeners;
526
527        let state_transitions = Arc::new(AtomicUsize::new(0));
528        let call_permitted = Arc::new(AtomicUsize::new(0));
529        let call_rejected = Arc::new(AtomicUsize::new(0));
530        let successes = Arc::new(AtomicUsize::new(0));
531        let failures = Arc::new(AtomicUsize::new(0));
532
533        let st_clone = Arc::clone(&state_transitions);
534        let cp_clone = Arc::clone(&call_permitted);
535        let cr_clone = Arc::clone(&call_rejected);
536        let s_clone = Arc::clone(&successes);
537        let f_clone = Arc::clone(&failures);
538
539        let config: CircuitBreakerConfig<(), ()> = CircuitBreakerConfig {
540            failure_rate_threshold: 0.5,
541            sliding_window_type: crate::config::SlidingWindowType::CountBased,
542            sliding_window_size: 10,
543            sliding_window_duration: None,
544            wait_duration_in_open: Duration::from_secs(1),
545            permitted_calls_in_half_open: 1,
546            failure_classifier: Arc::new(|r| r.is_err()),
547            minimum_number_of_calls: 10,
548            slow_call_duration_threshold: None,
549            slow_call_rate_threshold: 1.0,
550            event_listeners: {
551                let mut listeners = EventListeners::new();
552                listeners.add(tower_resilience_core::FnListener::new(
553                    move |event| match event {
554                        CircuitBreakerEvent::StateTransition { .. } => {
555                            st_clone.fetch_add(1, Ordering::SeqCst);
556                        }
557                        CircuitBreakerEvent::CallPermitted { .. } => {
558                            cp_clone.fetch_add(1, Ordering::SeqCst);
559                        }
560                        CircuitBreakerEvent::CallRejected { .. } => {
561                            cr_clone.fetch_add(1, Ordering::SeqCst);
562                        }
563                        CircuitBreakerEvent::SuccessRecorded { .. } => {
564                            s_clone.fetch_add(1, Ordering::SeqCst);
565                        }
566                        CircuitBreakerEvent::FailureRecorded { .. } => {
567                            f_clone.fetch_add(1, Ordering::SeqCst);
568                        }
569                        CircuitBreakerEvent::SlowCallDetected { .. } => {}
570                    },
571                ));
572                listeners
573            },
574            name: "test".into(),
575        };
576
577        let mut circuit = Circuit::new();
578
579        // Record failures to trigger state transition
580        for _ in 0..6 {
581            circuit.record_failure(&config, Duration::from_millis(10));
582        }
583        for _ in 0..4 {
584            circuit.record_success(&config, Duration::from_millis(10));
585        }
586
587        // Should have transitioned to Open
588        assert_eq!(circuit.state(), CircuitState::Open);
589        assert_eq!(state_transitions.load(Ordering::SeqCst), 1);
590        assert_eq!(failures.load(Ordering::SeqCst), 6);
591        assert_eq!(successes.load(Ordering::SeqCst), 4);
592
593        // Try acquiring (should be rejected)
594        let permitted = circuit.try_acquire(&config);
595        assert!(!permitted);
596        assert_eq!(call_rejected.load(Ordering::SeqCst), 1);
597    }
598
599    #[test]
600    fn test_slow_call_detection() {
601        use std::sync::atomic::{AtomicUsize, Ordering};
602        use tower_resilience_core::EventListeners;
603
604        let slow_calls = Arc::new(AtomicUsize::new(0));
605        let slow_clone = Arc::clone(&slow_calls);
606
607        let config: CircuitBreakerConfig<(), ()> = CircuitBreakerConfig {
608            failure_rate_threshold: 0.5,
609            sliding_window_type: crate::config::SlidingWindowType::CountBased,
610            sliding_window_size: 10,
611            sliding_window_duration: None,
612            wait_duration_in_open: Duration::from_secs(1),
613            permitted_calls_in_half_open: 1,
614            failure_classifier: Arc::new(|r| r.is_err()),
615            minimum_number_of_calls: 10,
616            slow_call_duration_threshold: Some(Duration::from_millis(100)),
617            slow_call_rate_threshold: 0.5,
618            event_listeners: {
619                let mut listeners = EventListeners::new();
620                listeners.add(tower_resilience_core::FnListener::new(move |event| {
621                    if matches!(event, CircuitBreakerEvent::SlowCallDetected { .. }) {
622                        slow_clone.fetch_add(1, Ordering::SeqCst);
623                    }
624                }));
625                listeners
626            },
627            name: "test".into(),
628        };
629
630        let mut circuit = Circuit::new();
631
632        // Record 6 slow calls (>100ms)
633        for _ in 0..6 {
634            circuit.record_success(&config, Duration::from_millis(150));
635        }
636        // Record 4 fast calls
637        for _ in 0..4 {
638            circuit.record_success(&config, Duration::from_millis(50));
639        }
640
641        // Should have detected 6 slow calls
642        assert_eq!(slow_calls.load(Ordering::SeqCst), 6);
643
644        // Should have transitioned to Open due to slow call rate (60%)
645        assert_eq!(circuit.state(), CircuitState::Open);
646    }
647
648    #[test]
649    fn test_slow_call_with_failures() {
650        use tower_resilience_core::EventListeners;
651
652        let config: CircuitBreakerConfig<(), ()> = CircuitBreakerConfig {
653            failure_rate_threshold: 1.0, // Don't open on failures
654            sliding_window_type: crate::config::SlidingWindowType::CountBased,
655            sliding_window_size: 10,
656            sliding_window_duration: None,
657            wait_duration_in_open: Duration::from_secs(1),
658            permitted_calls_in_half_open: 1,
659            failure_classifier: Arc::new(|r| r.is_err()),
660            minimum_number_of_calls: 10,
661            slow_call_duration_threshold: Some(Duration::from_millis(100)),
662            slow_call_rate_threshold: 0.5,
663            event_listeners: EventListeners::new(),
664            name: "test".into(),
665        };
666
667        let mut circuit = Circuit::new();
668
669        // Record 6 slow failures (failures can also be slow)
670        for _ in 0..6 {
671            circuit.record_failure(&config, Duration::from_millis(150));
672        }
673        // Record 4 fast successes
674        for _ in 0..4 {
675            circuit.record_success(&config, Duration::from_millis(50));
676        }
677
678        // Should open due to slow call rate, not failure rate
679        assert_eq!(circuit.state(), CircuitState::Open);
680    }
681
682    #[tokio::test]
683    async fn test_circuit_breaker_sync_state() {
684        let config = Arc::new(dummy_config());
685        let breaker: CircuitBreaker<(), (), (), ()> = CircuitBreaker::new((), config.clone());
686
687        // Can access state synchronously without .await
688        let sync_state = breaker.state_sync();
689        assert_eq!(sync_state, CircuitState::Closed);
690
691        // Force open and verify sync state matches
692        breaker.force_open().await;
693        assert_eq!(breaker.state_sync(), CircuitState::Open);
694        assert_eq!(breaker.state().await, CircuitState::Open);
695    }
696}