tower-circuitbreaker 0.2.0

A circuit breaker middleware for Tower services
Documentation

Circuit breaker pattern for Tower services.

A circuit breaker prevents cascading failures by monitoring service calls and temporarily blocking requests when the failure rate exceeds a threshold.

States

  • Closed: Normal operation, all requests pass through
  • Open: Circuit is tripped, requests are rejected immediately
  • Half-Open: Testing if service has recovered, limited requests allowed

Basic Example

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
use tower::service_fn;
use std::time::Duration;

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder()
    .failure_rate_threshold(0.5)  // Open at 50% failure rate
    .sliding_window_size(100)     // Track last 100 calls
    .wait_duration_in_open(Duration::from_secs(30))
    .build();

let svc = service_fn(|req: String| async move {
    Ok::<String, ()>(req)
});
let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
# }

Time-Based Sliding Window

Use time-based windows instead of count-based:

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker, SlidingWindowType};
use tower::service_fn;
use std::time::Duration;

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder()
    .failure_rate_threshold(0.5)
    .sliding_window_type(SlidingWindowType::TimeBased)
    .sliding_window_duration(Duration::from_secs(60))  // Last 60 seconds
    .minimum_number_of_calls(10)
    .build();

let svc = service_fn(|req: String| async move {
    Ok::<String, ()>(req)
});
let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
# }

Fallback Handler

Provide fallback responses when circuit is open:

use tower_circuitbreaker::CircuitBreakerConfig;
use tower::service_fn;
use std::time::Duration;
use futures::future::BoxFuture;

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder()
    .failure_rate_threshold(0.5)
    .sliding_window_size(100)
    .build();

let base_service = service_fn(|req: String| async move {
    Ok::<String, ()>(req)
});

let mut service = layer.layer(base_service)
    .with_fallback(|_req: String| -> BoxFuture<'static, Result<String, ()>> {
        Box::pin(async {
            Ok("fallback response".to_string())
        })
    });
# }

Custom Failure Classification

Control what counts as a failure:

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
use tower::service_fn;
use std::time::Duration;

# async fn example() {
let layer = CircuitBreakerConfig::<String, std::io::Error>::builder()
    .failure_rate_threshold(0.5)
    .sliding_window_size(100)
    .failure_classifier(|result: &Result<String, std::io::Error>| {
        match result {
            // Don't count timeouts as failures
            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => false,
            Err(_) => true,
            Ok(_) => false,
        }
    })
    .build();

let svc = service_fn(|req: String| async move {
    Ok::<String, std::io::Error>(req)
});
let mut service: CircuitBreaker<_, String, String, std::io::Error> = layer.layer(svc);
# }

Slow Call Detection

Open circuit based on slow calls:

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
use tower::service_fn;
use std::time::Duration;

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder()
    .failure_rate_threshold(1.0)  // Don't open on failures
    .slow_call_duration_threshold(Duration::from_secs(2))
    .slow_call_rate_threshold(0.5)  // Open at 50% slow calls
    .sliding_window_size(100)
    .build();

let svc = service_fn(|req: String| async move {
    Ok::<String, ()>(req)
});
let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
# }

Event Listeners

Monitor circuit breaker behavior:

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreaker};
use tower::service_fn;
use std::time::Duration;

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder()
    .failure_rate_threshold(0.5)
    .sliding_window_size(100)
    .on_state_transition(|from, to| {
        println!("Circuit breaker: {:?} -> {:?}", from, to);
    })
    .on_call_permitted(|state| {
        println!("Call permitted in state: {:?}", state);
    })
    .on_call_rejected(|| {
        println!("Call rejected - circuit open");
    })
    .on_slow_call(|duration| {
        println!("Slow call detected: {:?}", duration);
    })
    .build();

let svc = service_fn(|req: String| async move {
    Ok::<String, ()>(req)
});
let mut service: CircuitBreaker<_, String, String, ()> = layer.layer(svc);
# }

Error Handling

use tower_circuitbreaker::{CircuitBreakerConfig, CircuitBreakerError};
use tower::{Service, service_fn};

# async fn example() {
let layer = CircuitBreakerConfig::<String, ()>::builder().build();
let mut service = layer.layer(service_fn(|req: String| async move {
    Ok::<_, ()>(req)
}));

match service.call("request".to_string()).await {
    Ok(response) => println!("Success: {}", response),
    Err(CircuitBreakerError::OpenCircuit) => {
        eprintln!("Circuit breaker is open");
    }
    Err(CircuitBreakerError::Inner(e)) => {
        eprintln!("Service error: {:?}", e);
    }
}
# }

Features

  • Count-based and time-based sliding windows
  • Configurable failure rate threshold
  • Slow call detection and rate threshold
  • Half-open state for gradual recovery
  • Event system for observability
  • Optional fallback handling
  • Manual state control (force_open, force_closed, reset)
  • Sync state inspection with state_sync()
  • Metrics integration via metrics feature
  • Tracing support via tracing feature

Feature Flags

  • metrics: enables metrics collection using the metrics crate
  • tracing: enables logging and tracing using the tracing crate