Expand description
Bulkhead pattern for Tower services.
The bulkhead pattern isolates resources to prevent cascading failures. This implementation uses semaphore-based concurrency limiting to control the maximum number of concurrent calls to a service.
§Basic Example
use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;
use std::time::Duration;
// Create a bulkhead that allows max 10 concurrent calls
let layer = BulkheadLayer::builder()
.max_concurrent_calls(10)
.name("my-bulkhead")
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
// Your service logic here
Ok::<_, ()>(req)
});§Rejection Mode (Fail-Fast)
Configure the bulkhead to reject requests immediately when at capacity, rather than queueing them. This is useful for load shedding and providing immediate feedback to callers.
This addresses the use case in tower-rs/tower#793.
use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;
// Reject immediately when at capacity (no waiting)
let layer = BulkheadLayer::builder()
.max_concurrent_calls(100)
.reject_when_full()
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<_, ()>(req)
});§Example with Timeout
Configure a maximum wait duration for requests when the bulkhead is at capacity:
use tower::ServiceBuilder;
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadError};
use std::time::Duration;
let layer = BulkheadLayer::builder()
.max_concurrent_calls(5)
.max_wait_duration(Duration::from_secs(2))
.name("timeout-bulkhead")
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<_, ()>(req)
});
// Requests will timeout if they wait more than 2 seconds
// for bulkhead capacity§Example with Event Listeners
Monitor bulkhead behavior using event listeners:
use tower::ServiceBuilder;
use tower_resilience_bulkhead::BulkheadLayer;
use std::time::Duration;
let layer = BulkheadLayer::builder()
.max_concurrent_calls(10)
.name("monitored-bulkhead")
.on_call_permitted(|concurrent| {
println!("Call permitted ({} concurrent)", concurrent);
})
.on_call_rejected(|max| {
println!("Call rejected (max {} concurrent)", max);
})
.on_call_finished(|duration| {
println!("Call finished in {:?}", duration);
})
.build();§Fallback When Bulkhead is Full
Handle bulkhead capacity errors with graceful degradation:
§Shed Load with Informative Error
use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadServiceError};
use tower_resilience_core::ResilienceError;
use std::time::Duration;
let layer = BulkheadLayer::builder()
.max_concurrent_calls(5)
.max_wait_duration(Duration::from_millis(100))
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<String, String>(format!("Processed: {}", req))
});
// Convert BulkheadServiceError to ResilienceError for unified error handling
match service.oneshot("request".to_string()).await {
Ok(response) => println!("Success: {}", response),
Err(e) => {
let resilience_err: ResilienceError<String> = e.into();
if resilience_err.is_bulkhead_full() {
println!("System at capacity - please try again later");
// Could return 503 Service Unavailable in HTTP context
} else if resilience_err.is_timeout() {
println!("Request queued too long - system busy");
} else {
println!("Service error: {:?}", resilience_err);
}
}
}§Queue for Later Processing
use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::{BulkheadLayer, BulkheadServiceError};
use tower_resilience_core::ResilienceError;
use std::time::Duration;
use std::sync::Arc;
use tokio::sync::Mutex;
let queue = Arc::new(Mutex::new(Vec::new()));
let layer = BulkheadLayer::builder()
.max_concurrent_calls(10)
.max_wait_duration(Duration::from_millis(50))
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<String, String>(req)
});
let queue_clone = Arc::clone(&queue);
let result: Result<String, ResilienceError<String>> = match service.oneshot("request".to_string()).await {
Ok(response) => Ok(response),
Err(e) => {
let resilience_err: ResilienceError<String> = e.into();
if resilience_err.is_bulkhead_full() || resilience_err.is_timeout() {
// Queue for background processing
queue_clone.lock().await.push("request".to_string());
Ok("Queued for processing".to_string())
} else {
Err(resilience_err)
}
}
};§Return Degraded Response
use tower::{ServiceBuilder, ServiceExt};
use tower_resilience_bulkhead::BulkheadLayer;
use tower_resilience_core::ResilienceError;
use std::time::Duration;
let layer = BulkheadLayer::builder()
.max_concurrent_calls(5)
.max_wait_duration(Duration::from_millis(10))
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<String, ResilienceError<String>>(format!("Full response: {}", req))
});
let result = service.oneshot("data".to_string()).await
.unwrap_or_else(|_| {
// Provide degraded functionality instead of full failure
"Degraded mode: limited data available".to_string()
});§Error Handling
The bulkhead passes through the inner service’s errors directly. Use event listeners to track bulkhead rejections:
use tower_resilience_bulkhead::BulkheadLayer;
use tower::ServiceBuilder;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let rejections = Arc::new(AtomicUsize::new(0));
let r = rejections.clone();
let layer = BulkheadLayer::builder()
.max_concurrent_calls(5)
.on_call_rejected(move |_| {
r.fetch_add(1, Ordering::SeqCst);
})
.build();
let service = ServiceBuilder::new()
.layer(layer)
.service_fn(|req: String| async move {
Ok::<_, ()>(req)
});
// Check rejections counter to monitor bulkhead behavior
println!("Rejections: {}", rejections.load(Ordering::SeqCst));Re-exports§
pub use config::BulkheadConfig;pub use config::BulkheadConfigBuilder;pub use error::BulkheadError;pub use error::BulkheadServiceError;pub use error::Result;pub use events::BulkheadEvent;pub use layer::BulkheadLayer;pub use service::Bulkhead;
Modules§
- config
- Configuration types for the bulkhead pattern. Configuration for the bulkhead pattern.
- error
- Error types for bulkhead rejections. Error types for bulkhead pattern.
- events
- Event types emitted by the bulkhead. Event types for bulkhead pattern.
- layer
- Tower
Layerimplementation for the bulkhead. Tower layer implementation for bulkhead. - service
- Tower
Serviceimplementation for the bulkhead. Bulkhead service implementation.
Structs§
- Bulkhead
Handle - A read-only handle for observing bulkhead state.