1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
//! Configuration for the bulkhead pattern.
use crate::events::BulkheadEvent;
use std::time::Duration;
use tower_resilience_core::events::{EventListeners, FnListener};
/// Configuration for the bulkhead pattern.
#[derive(Clone)]
pub struct BulkheadConfig {
/// Maximum number of concurrent calls allowed.
pub(crate) max_concurrent_calls: usize,
/// Maximum time to wait for a permit.
pub(crate) max_wait_duration: Option<Duration>,
/// Name of this bulkhead instance.
pub(crate) name: String,
/// Event listeners.
pub(crate) event_listeners: EventListeners<BulkheadEvent>,
}
/// Builder for bulkhead configuration.
pub struct BulkheadConfigBuilder {
max_concurrent_calls: usize,
max_wait_duration: Option<Duration>,
name: String,
event_listeners: EventListeners<BulkheadEvent>,
}
impl BulkheadConfigBuilder {
/// Creates a new builder with default values.
pub fn new() -> Self {
Self {
max_concurrent_calls: 25,
max_wait_duration: None,
name: "bulkhead".to_string(),
event_listeners: EventListeners::new(),
}
}
/// Sets the maximum number of concurrent calls.
///
/// Default: 25
pub fn max_concurrent_calls(mut self, max: usize) -> Self {
self.max_concurrent_calls = max;
self
}
/// Sets the maximum time to wait for a permit.
///
/// If `None`, calls will wait indefinitely.
/// Default: None
pub fn max_wait_duration(mut self, duration: Option<Duration>) -> Self {
self.max_wait_duration = duration;
self
}
/// Sets the name of this bulkhead instance.
///
/// Default: "bulkhead"
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
/// Registers a callback when a call is permitted through the bulkhead.
///
/// This callback is invoked when a request successfully acquires a permit from the bulkhead
/// and is allowed to proceed to the underlying service. The bulkhead permits calls as long
/// as the current number of concurrent calls is below the configured maximum.
///
/// # Callback Signature
/// `Fn(usize)` - Called with the current number of concurrent calls after this call was permitted.
/// This value will be between 1 and `max_concurrent_calls` (inclusive).
///
/// # Example
/// ```rust,no_run
/// use tower_resilience_bulkhead::BulkheadLayer;
///
/// let config = BulkheadLayer::builder()
/// .max_concurrent_calls(10)
/// .on_call_permitted(|concurrent| {
/// println!("Call permitted - now {} concurrent calls", concurrent);
/// if concurrent >= 8 {
/// println!("Warning: approaching capacity!");
/// }
/// })
/// .build();
/// ```
pub fn on_call_permitted<F>(mut self, f: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.event_listeners.add(FnListener::new(move |event| {
if let BulkheadEvent::CallPermitted {
concurrent_calls, ..
} = event
{
f(*concurrent_calls);
}
}));
self
}
/// Registers a callback when a call is rejected by the bulkhead.
///
/// This callback is invoked when a request is rejected because the bulkhead is at full capacity
/// (the maximum number of concurrent calls has been reached) and the request either cannot wait
/// or has exceeded the `max_wait_duration`.
///
/// # Callback Signature
/// `Fn(usize)` - Called with the configured maximum number of concurrent calls allowed.
/// This represents the bulkhead's capacity that has been exceeded.
///
/// # Example
/// ```rust,no_run
/// use tower_resilience_bulkhead::BulkheadLayer;
/// use std::sync::atomic::{AtomicUsize, Ordering};
/// use std::sync::Arc;
///
/// let rejection_count = Arc::new(AtomicUsize::new(0));
/// let counter = Arc::clone(&rejection_count);
///
/// let config = BulkheadLayer::builder()
/// .max_concurrent_calls(25)
/// .on_call_rejected(move |max_capacity| {
/// let count = counter.fetch_add(1, Ordering::SeqCst);
/// println!("Call rejected - bulkhead at capacity ({} max), total rejections: {}",
/// max_capacity, count + 1);
/// })
/// .build();
/// ```
pub fn on_call_rejected<F>(mut self, f: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.event_listeners.add(FnListener::new(move |event| {
if let BulkheadEvent::CallRejected {
max_concurrent_calls,
..
} = event
{
f(*max_concurrent_calls);
}
}));
self
}
/// Registers a callback when a call finishes successfully.
///
/// This callback is invoked when a request that was permitted through the bulkhead
/// completes successfully and releases its permit. This happens regardless of the
/// response value, as long as no error occurred.
///
/// # Callback Signature
/// `Fn(Duration)` - Called with the total duration the call took to complete,
/// from when it was permitted until it finished.
///
/// # Example
/// ```rust,no_run
/// use tower_resilience_bulkhead::BulkheadLayer;
/// use std::time::Duration;
///
/// let config = BulkheadLayer::builder()
/// .max_concurrent_calls(25)
/// .on_call_finished(|duration| {
/// println!("Call completed successfully in {:?}", duration);
/// if duration > Duration::from_secs(5) {
/// println!("Warning: slow call detected");
/// }
/// })
/// .build();
/// ```
pub fn on_call_finished<F>(mut self, f: F) -> Self
where
F: Fn(Duration) + Send + Sync + 'static,
{
self.event_listeners.add(FnListener::new(move |event| {
if let BulkheadEvent::CallFinished { duration, .. } = event {
f(*duration);
}
}));
self
}
/// Registers a callback when a call fails with an error.
///
/// This callback is invoked when a request that was permitted through the bulkhead
/// fails with an error and releases its permit. The error could be from the underlying
/// service or from middleware in the chain.
///
/// # Callback Signature
/// `Fn(Duration)` - Called with the total duration the call took before failing,
/// from when it was permitted until the error occurred.
///
/// # Example
/// ```rust,no_run
/// use tower_resilience_bulkhead::BulkheadLayer;
/// use std::time::Duration;
/// use std::sync::atomic::{AtomicUsize, Ordering};
/// use std::sync::Arc;
///
/// let error_count = Arc::new(AtomicUsize::new(0));
/// let counter = Arc::clone(&error_count);
///
/// let config = BulkheadLayer::builder()
/// .max_concurrent_calls(25)
/// .on_call_failed(move |duration| {
/// let count = counter.fetch_add(1, Ordering::SeqCst);
/// println!("Call failed after {:?} (total failures: {})", duration, count + 1);
/// })
/// .build();
/// ```
pub fn on_call_failed<F>(mut self, f: F) -> Self
where
F: Fn(Duration) + Send + Sync + 'static,
{
self.event_listeners.add(FnListener::new(move |event| {
if let BulkheadEvent::CallFailed { duration, .. } = event {
f(*duration);
}
}));
self
}
/// Builds the configuration and returns a BulkheadLayer.
pub fn build(self) -> crate::layer::BulkheadLayer {
let config = BulkheadConfig {
max_concurrent_calls: self.max_concurrent_calls,
max_wait_duration: self.max_wait_duration,
name: self.name,
event_listeners: self.event_listeners,
};
crate::layer::BulkheadLayer::new(config)
}
}
impl Default for BulkheadConfigBuilder {
fn default() -> Self {
Self::new()
}
}