tower_bulkhead/lib.rs
1//! Bulkhead pattern for Tower services.
2//!
3//! The bulkhead pattern isolates resources to prevent cascading failures.
4//! This implementation uses semaphore-based concurrency limiting to control
5//! the maximum number of concurrent calls to a service.
6//!
7//! # Basic Example
8//!
9//! ```rust
10//! use tower::ServiceBuilder;
11//! use tower_bulkhead::BulkheadConfig;
12//! use std::time::Duration;
13//!
14//! # async fn example() {
15//! // Create a bulkhead that allows max 10 concurrent calls
16//! let layer = BulkheadConfig::builder()
17//! .max_concurrent_calls(10)
18//! .name("my-bulkhead")
19//! .build();
20//!
21//! let service = ServiceBuilder::new()
22//! .layer(layer)
23//! .service_fn(|req: String| async move {
24//! // Your service logic here
25//! Ok::<_, ()>(req)
26//! });
27//! # }
28//! ```
29//!
30//! # Example with Timeout
31//!
32//! Configure a maximum wait duration for requests when the bulkhead is at capacity:
33//!
34//! ```rust
35//! use tower::ServiceBuilder;
36//! use tower_bulkhead::{BulkheadConfig, BulkheadError};
37//! use std::time::Duration;
38//!
39//! # async fn example() {
40//! let layer = BulkheadConfig::builder()
41//! .max_concurrent_calls(5)
42//! .max_wait_duration(Some(Duration::from_secs(2)))
43//! .name("timeout-bulkhead")
44//! .build();
45//!
46//! let service = ServiceBuilder::new()
47//! .layer(layer)
48//! .service_fn(|req: String| async move {
49//! Ok::<_, ()>(req)
50//! });
51//!
52//! // Requests will timeout if they wait more than 2 seconds
53//! // for bulkhead capacity
54//! # }
55//! ```
56//!
57//! # Example with Event Listeners
58//!
59//! Monitor bulkhead behavior using event listeners:
60//!
61//! ```rust
62//! use tower::ServiceBuilder;
63//! use tower_bulkhead::BulkheadConfig;
64//! use std::time::Duration;
65//!
66//! # async fn example() {
67//! let layer = BulkheadConfig::builder()
68//! .max_concurrent_calls(10)
69//! .name("monitored-bulkhead")
70//! .on_call_permitted(|concurrent| {
71//! println!("Call permitted ({} concurrent)", concurrent);
72//! })
73//! .on_call_rejected(|max| {
74//! println!("Call rejected (max {} concurrent)", max);
75//! })
76//! .on_call_finished(|duration| {
77//! println!("Call finished in {:?}", duration);
78//! })
79//! .build();
80//!
81//! let service = ServiceBuilder::new()
82//! .layer(layer)
83//! .service_fn(|req: String| async move {
84//! Ok::<_, ()>(req)
85//! });
86//! # }
87//! ```
88//!
89//! # Error Handling
90//!
91//! The bulkhead passes through the inner service's errors directly.
92//! Use event listeners to track bulkhead rejections:
93//!
94//! ```rust
95//! use tower_bulkhead::BulkheadConfig;
96//! use tower::ServiceBuilder;
97//! use std::sync::atomic::{AtomicUsize, Ordering};
98//! use std::sync::Arc;
99//!
100//! # async fn example() {
101//! let rejections = Arc::new(AtomicUsize::new(0));
102//! let r = rejections.clone();
103//!
104//! let layer = BulkheadConfig::builder()
105//! .max_concurrent_calls(5)
106//! .on_call_rejected(move |_| {
107//! r.fetch_add(1, Ordering::SeqCst);
108//! })
109//! .build();
110//!
111//! let service = ServiceBuilder::new()
112//! .layer(layer)
113//! .service_fn(|req: String| async move {
114//! Ok::<_, ()>(req)
115//! });
116//!
117//! // Check rejections counter to monitor bulkhead behavior
118//! println!("Rejections: {}", rejections.load(Ordering::SeqCst));
119//! # }
120//! ```
121
122pub mod config;
123pub mod error;
124pub mod events;
125pub mod layer;
126pub mod service;
127
128pub use config::{BulkheadConfig, BulkheadConfigBuilder};
129pub use error::{BulkheadError, Result};
130pub use events::BulkheadEvent;
131pub use layer::BulkheadLayer;
132pub use service::Bulkhead;
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use std::sync::Arc;
138 use std::sync::atomic::{AtomicUsize, Ordering};
139 use std::time::Duration;
140
141 #[test]
142 fn test_config_builder_defaults() {
143 let _config = BulkheadConfig::builder().build();
144 // Layer is built, so we can't inspect config directly
145 // This test just ensures the builder works
146 }
147
148 #[test]
149 fn test_config_builder_with_custom_values() {
150 let counter = Arc::new(AtomicUsize::new(0));
151 let c = Arc::clone(&counter);
152
153 let _layer = BulkheadConfig::builder()
154 .max_concurrent_calls(5)
155 .max_wait_duration(Some(Duration::from_millis(100)))
156 .name("test-bulkhead")
157 .on_call_permitted(move |_| {
158 c.fetch_add(1, Ordering::SeqCst);
159 })
160 .build();
161
162 // Builder accepts all parameters without panic
163 }
164
165 #[test]
166 fn test_bulkhead_error_display() {
167 let err = BulkheadError::BulkheadFull {
168 max_concurrent_calls: 10,
169 };
170 assert!(err.to_string().contains("10"));
171
172 let err = BulkheadError::Timeout;
173 assert!(err.to_string().contains("timeout"));
174 }
175
176 #[test]
177 fn test_bulkhead_event_types() {
178 use std::time::Instant;
179 use tower_resilience_core::events::ResilienceEvent;
180
181 let event = BulkheadEvent::CallPermitted {
182 pattern_name: "test".to_string(),
183 timestamp: Instant::now(),
184 concurrent_calls: 5,
185 };
186 assert_eq!(event.event_type(), "call_permitted");
187 assert_eq!(event.pattern_name(), "test");
188
189 let event = BulkheadEvent::CallRejected {
190 pattern_name: "test".to_string(),
191 timestamp: Instant::now(),
192 max_concurrent_calls: 10,
193 };
194 assert_eq!(event.event_type(), "call_rejected");
195
196 let event = BulkheadEvent::CallFinished {
197 pattern_name: "test".to_string(),
198 timestamp: Instant::now(),
199 duration: Duration::from_millis(50),
200 };
201 assert_eq!(event.event_type(), "call_finished");
202
203 let event = BulkheadEvent::CallFailed {
204 pattern_name: "test".to_string(),
205 timestamp: Instant::now(),
206 duration: Duration::from_millis(50),
207 };
208 assert_eq!(event.event_type(), "call_failed");
209 }
210}