tower_bulkhead/
service.rs1use crate::config::BulkheadConfig;
4use crate::error::BulkheadError;
5use crate::events::BulkheadEvent;
6use futures::future::BoxFuture;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use tokio::sync::Semaphore;
11use tower::Service;
12
13#[cfg(feature = "metrics")]
14use metrics::{counter, gauge};
15
16#[derive(Clone)]
18pub struct Bulkhead<S> {
19 inner: S,
20 semaphore: Arc<Semaphore>,
21 config: Arc<BulkheadConfig>,
22}
23
24impl<S> Bulkhead<S> {
25 pub(crate) fn new(inner: S, config: BulkheadConfig) -> Self {
27 let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
28 Self {
29 inner,
30 semaphore,
31 config: Arc::new(config),
32 }
33 }
34}
35
36impl<S, Request> Service<Request> for Bulkhead<S>
37where
38 S: Service<Request> + Clone + Send + 'static,
39 S::Future: Send + 'static,
40 S::Response: Send + 'static,
41 S::Error: From<BulkheadError> + Send + 'static,
42 Request: Send + 'static,
43{
44 type Response = S::Response;
45 type Error = S::Error;
46 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
47
48 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 self.inner.poll_ready(cx)
50 }
51
52 fn call(&mut self, request: Request) -> Self::Future {
53 let semaphore = Arc::clone(&self.semaphore);
54 let semaphore_for_check = Arc::clone(&self.semaphore);
55 let config = Arc::clone(&self.config);
56 let mut inner = self.inner.clone();
57 let start_time = Instant::now();
58
59 Box::pin(async move {
60 let permit = match config.max_wait_duration {
62 Some(duration) => {
63 match tokio::time::timeout(duration, semaphore.acquire_owned()).await {
64 Ok(Ok(permit)) => permit,
65 Ok(Err(_)) => {
66 let event = BulkheadEvent::CallRejected {
68 pattern_name: config.name.clone(),
69 timestamp: Instant::now(),
70 max_concurrent_calls: config.max_concurrent_calls,
71 };
72 config.event_listeners.emit(&event);
73
74 #[cfg(feature = "metrics")]
75 counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
76 .increment(1);
77
78 return Err(BulkheadError::BulkheadFull {
79 max_concurrent_calls: config.max_concurrent_calls,
80 }
81 .into());
82 }
83 Err(_) => {
84 let event = BulkheadEvent::CallRejected {
86 pattern_name: config.name.clone(),
87 timestamp: Instant::now(),
88 max_concurrent_calls: config.max_concurrent_calls,
89 };
90 config.event_listeners.emit(&event);
91
92 #[cfg(feature = "metrics")]
93 counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
94 .increment(1);
95
96 return Err(BulkheadError::Timeout.into());
97 }
98 }
99 }
100 None => {
101 match semaphore.acquire_owned().await {
103 Ok(permit) => permit,
104 Err(_) => {
105 let event = BulkheadEvent::CallRejected {
107 pattern_name: config.name.clone(),
108 timestamp: Instant::now(),
109 max_concurrent_calls: config.max_concurrent_calls,
110 };
111 config.event_listeners.emit(&event);
112
113 #[cfg(feature = "metrics")]
114 counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
115 .increment(1);
116
117 return Err(BulkheadError::BulkheadFull {
118 max_concurrent_calls: config.max_concurrent_calls,
119 }
120 .into());
121 }
122 }
123 }
124 };
125
126 let concurrent_calls =
128 config.max_concurrent_calls - semaphore_for_check.available_permits();
129 let event = BulkheadEvent::CallPermitted {
130 pattern_name: config.name.clone(),
131 timestamp: Instant::now(),
132 concurrent_calls,
133 };
134 config.event_listeners.emit(&event);
135
136 #[cfg(feature = "metrics")]
137 {
138 counter!("bulkhead_calls_permitted_total", "bulkhead" => config.name.clone())
139 .increment(1);
140 gauge!("bulkhead_concurrent_calls", "bulkhead" => config.name.clone())
141 .set(concurrent_calls as f64);
142 }
143
144 let result = inner.call(request).await;
146
147 drop(permit);
149
150 let duration = start_time.elapsed();
151
152 match &result {
154 Ok(_) => {
155 let event = BulkheadEvent::CallFinished {
156 pattern_name: config.name.clone(),
157 timestamp: Instant::now(),
158 duration,
159 };
160 config.event_listeners.emit(&event);
161
162 #[cfg(feature = "metrics")]
163 counter!("bulkhead_calls_finished_total", "bulkhead" => config.name.clone())
164 .increment(1);
165 }
166 Err(_) => {
167 let event = BulkheadEvent::CallFailed {
168 pattern_name: config.name.clone(),
169 timestamp: Instant::now(),
170 duration,
171 };
172 config.event_listeners.emit(&event);
173
174 #[cfg(feature = "metrics")]
175 counter!("bulkhead_calls_failed_total", "bulkhead" => config.name.clone())
176 .increment(1);
177 }
178 }
179
180 #[cfg(feature = "metrics")]
181 {
182 let new_concurrent =
183 config.max_concurrent_calls - semaphore_for_check.available_permits();
184 gauge!("bulkhead_concurrent_calls", "bulkhead" => config.name.clone())
185 .set(new_concurrent as f64);
186 }
187
188 result
189 })
190 }
191}