Skip to main content

Crate tower_resilience_bulkhead

Crate tower_resilience_bulkhead 

Source
Expand description

Bulkhead pattern for Tower services.

The bulkhead pattern isolates resources to prevent cascading failures. This implementation uses semaphore-based concurrency limiting to control the maximum number of concurrent calls to a service.

§Basic Example

use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;
use std::time::Duration;

// Create a bulkhead that allows max 10 concurrent calls
let layer = BulkheadLayer::builder()
    .max_concurrent_calls(10)
    .name("my-bulkhead")
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        // Your service logic here
        Ok::<_, ()>(req)
    });

§Rejection Mode (Fail-Fast)

Configure the bulkhead to reject requests immediately when at capacity, rather than queueing them. This is useful for load shedding and providing immediate feedback to callers.

This addresses the use case in tower-rs/tower#793.

use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;

// Reject immediately when at capacity (no waiting)
let layer = BulkheadLayer::builder()
    .max_concurrent_calls(100)
    .reject_when_full()
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<_, ()>(req)
    });

§Example with Timeout

Configure a maximum wait duration for requests when the bulkhead is at capacity:

use tower::ServiceBuilder;
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadError};
use std::time::Duration;

let layer = BulkheadLayer::builder()
    .max_concurrent_calls(5)
    .max_wait_duration(Duration::from_secs(2))
    .name("timeout-bulkhead")
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<_, ()>(req)
    });

// Requests will timeout if they wait more than 2 seconds
// for bulkhead capacity

§Example with Event Listeners

Monitor bulkhead behavior using event listeners:

use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;
use std::time::Duration;

let layer = BulkheadLayer::builder()
    .max_concurrent_calls(10)
    .name("monitored-bulkhead")
    .on_call_permitted(|concurrent| {
        println!("Call permitted ({} concurrent)", concurrent);
    })
    .on_call_rejected(|max| {
        println!("Call rejected (max {} concurrent)", max);
    })
    .on_call_finished(|duration| {
        println!("Call finished in {:?}", duration);
    })
    .build();

§Fallback When Bulkhead is Full

Handle bulkhead capacity errors with graceful degradation:

§Shed Load with Informative Error

use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadServiceError};
use tower_resilience_core::ResilienceError;
use std::time::Duration;

let layer = BulkheadLayer::builder()
    .max_concurrent_calls(5)
    .max_wait_duration(Duration::from_millis(100))
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<String, String>(format!("Processed: {}", req))
    });

// Convert BulkheadServiceError to ResilienceError for unified error handling
match service.oneshot("request".to_string()).await {
    Ok(response) => println!("Success: {}", response),
    Err(e) => {
        let resilience_err: ResilienceError<String> = e.into();
        if resilience_err.is_bulkhead_full() {
            println!("System at capacity - please try again later");
            // Could return 503 Service Unavailable in HTTP context
        } else if resilience_err.is_timeout() {
            println!("Request queued too long - system busy");
        } else {
            println!("Service error: {:?}", resilience_err);
        }
    }
}

§Queue for Later Processing

use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadServiceError};
use tower_resilience_core::ResilienceError;
use std::time::Duration;
use std::sync::Arc;
use tokio::sync::Mutex;

let queue = Arc::new(Mutex::new(Vec::new()));
let layer = BulkheadLayer::builder()
    .max_concurrent_calls(10)
    .max_wait_duration(Duration::from_millis(50))
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<String, String>(req)
    });

let queue_clone = Arc::clone(&queue);
let result: Result<String, ResilienceError<String>> = match service.oneshot("request".to_string()).await {
    Ok(response) => Ok(response),
    Err(e) => {
        let resilience_err: ResilienceError<String> = e.into();
        if resilience_err.is_bulkhead_full() || resilience_err.is_timeout() {
            // Queue for background processing
            queue_clone.lock().await.push("request".to_string());
            Ok("Queued for processing".to_string())
        } else {
            Err(resilience_err)
        }
    }
};

§Return Degraded Response

use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::BulkheadLayer;
use tower_resilience_core::ResilienceError;
use std::time::Duration;

let layer = BulkheadLayer::builder()
    .max_concurrent_calls(5)
    .max_wait_duration(Duration::from_millis(10))
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<String, ResilienceError<String>>(format!("Full response: {}", req))
    });

let result = service.oneshot("data".to_string()).await
    .unwrap_or_else(|_| {
        // Provide degraded functionality instead of full failure
        "Degraded mode: limited data available".to_string()
    });

§Error Handling

The bulkhead passes through the inner service’s errors directly. Use event listeners to track bulkhead rejections:

use tower_resilience_bulkhead::BulkheadLayer;
use tower::ServiceBuilder;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

let rejections = Arc::new(AtomicUsize::new(0));
let r = rejections.clone();

let layer = BulkheadLayer::builder()
    .max_concurrent_calls(5)
    .on_call_rejected(move |_| {
        r.fetch_add(1, Ordering::SeqCst);
    })
    .build();

let service = ServiceBuilder::new()
    .layer(layer)
    .service_fn(|req: String| async move {
        Ok::<_, ()>(req)
    });

// Check rejections counter to monitor bulkhead behavior
println!("Rejections: {}", rejections.load(Ordering::SeqCst));

Re-exports§

pub use config::BulkheadConfig;
pub use config::BulkheadConfigBuilder;
pub use error::BulkheadError;
pub use error::BulkheadServiceError;
pub use error::Result;
pub use events::BulkheadEvent;
pub use layer::BulkheadLayer;
pub use service::Bulkhead;

Modules§

config
Configuration types for the bulkhead pattern. Configuration for the bulkhead pattern.
error
Error types for bulkhead rejections. Error types for bulkhead pattern.
events
Event types emitted by the bulkhead. Event types for bulkhead pattern.
layer
Tower Layer implementation for the bulkhead. Tower layer implementation for bulkhead.
service
Tower Service implementation for the bulkhead. Bulkhead service implementation.

Structs§

BulkheadHandle
A read-only handle for observing bulkhead state.