Skip to main content

tower_bulkhead/
service.rs

1//! Bulkhead service implementation.
2
3use crate::config::BulkheadConfig;
4use crate::error::BulkheadError;
5use crate::events::BulkheadEvent;
6use futures::future::BoxFuture;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use std::time::Instant;
10use tokio::sync::Semaphore;
11use tower::Service;
12
13#[cfg(feature = "metrics")]
14use metrics::{counter, gauge};
15
16/// Bulkhead service that limits concurrent calls.
17#[derive(Clone)]
18pub struct Bulkhead<S> {
19    inner: S,
20    semaphore: Arc<Semaphore>,
21    config: Arc<BulkheadConfig>,
22}
23
24impl<S> Bulkhead<S> {
25    /// Creates a new bulkhead service.
26    pub(crate) fn new(inner: S, config: BulkheadConfig) -> Self {
27        let semaphore = Arc::new(Semaphore::new(config.max_concurrent_calls));
28        Self {
29            inner,
30            semaphore,
31            config: Arc::new(config),
32        }
33    }
34}
35
36impl<S, Request> Service<Request> for Bulkhead<S>
37where
38    S: Service<Request> + Clone + Send + 'static,
39    S::Future: Send + 'static,
40    S::Response: Send + 'static,
41    S::Error: From<BulkheadError> + Send + 'static,
42    Request: Send + 'static,
43{
44    type Response = S::Response;
45    type Error = S::Error;
46    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
47
48    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        self.inner.poll_ready(cx)
50    }
51
52    fn call(&mut self, request: Request) -> Self::Future {
53        let semaphore = Arc::clone(&self.semaphore);
54        let semaphore_for_check = Arc::clone(&self.semaphore);
55        let config = Arc::clone(&self.config);
56        let mut inner = self.inner.clone();
57        let start_time = Instant::now();
58
59        Box::pin(async move {
60            // Try to acquire a permit
61            let permit = match config.max_wait_duration {
62                Some(duration) => {
63                    match tokio::time::timeout(duration, semaphore.acquire_owned()).await {
64                        Ok(Ok(permit)) => permit,
65                        Ok(Err(_)) => {
66                            // Semaphore was closed, shouldn't happen in normal operation
67                            let event = BulkheadEvent::CallRejected {
68                                pattern_name: config.name.clone(),
69                                timestamp: Instant::now(),
70                                max_concurrent_calls: config.max_concurrent_calls,
71                            };
72                            config.event_listeners.emit(&event);
73
74                            #[cfg(feature = "metrics")]
75                            counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
76                                .increment(1);
77
78                            return Err(BulkheadError::BulkheadFull {
79                                max_concurrent_calls: config.max_concurrent_calls,
80                            }
81                            .into());
82                        }
83                        Err(_) => {
84                            // Timeout
85                            let event = BulkheadEvent::CallRejected {
86                                pattern_name: config.name.clone(),
87                                timestamp: Instant::now(),
88                                max_concurrent_calls: config.max_concurrent_calls,
89                            };
90                            config.event_listeners.emit(&event);
91
92                            #[cfg(feature = "metrics")]
93                            counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
94                                .increment(1);
95
96                            return Err(BulkheadError::Timeout.into());
97                        }
98                    }
99                }
100                None => {
101                    // Wait indefinitely
102                    match semaphore.acquire_owned().await {
103                        Ok(permit) => permit,
104                        Err(_) => {
105                            // Semaphore was closed
106                            let event = BulkheadEvent::CallRejected {
107                                pattern_name: config.name.clone(),
108                                timestamp: Instant::now(),
109                                max_concurrent_calls: config.max_concurrent_calls,
110                            };
111                            config.event_listeners.emit(&event);
112
113                            #[cfg(feature = "metrics")]
114                            counter!("bulkhead_calls_rejected_total", "bulkhead" => config.name.clone())
115                                .increment(1);
116
117                            return Err(BulkheadError::BulkheadFull {
118                                max_concurrent_calls: config.max_concurrent_calls,
119                            }
120                            .into());
121                        }
122                    }
123                }
124            };
125
126            // Emit call permitted event
127            let concurrent_calls =
128                config.max_concurrent_calls - semaphore_for_check.available_permits();
129            let event = BulkheadEvent::CallPermitted {
130                pattern_name: config.name.clone(),
131                timestamp: Instant::now(),
132                concurrent_calls,
133            };
134            config.event_listeners.emit(&event);
135
136            #[cfg(feature = "metrics")]
137            {
138                counter!("bulkhead_calls_permitted_total", "bulkhead" => config.name.clone())
139                    .increment(1);
140                gauge!("bulkhead_concurrent_calls", "bulkhead" => config.name.clone())
141                    .set(concurrent_calls as f64);
142            }
143
144            // Call the inner service
145            let result = inner.call(request).await;
146
147            // Drop the permit to release the slot
148            drop(permit);
149
150            let duration = start_time.elapsed();
151
152            // Emit completion event
153            match &result {
154                Ok(_) => {
155                    let event = BulkheadEvent::CallFinished {
156                        pattern_name: config.name.clone(),
157                        timestamp: Instant::now(),
158                        duration,
159                    };
160                    config.event_listeners.emit(&event);
161
162                    #[cfg(feature = "metrics")]
163                    counter!("bulkhead_calls_finished_total", "bulkhead" => config.name.clone())
164                        .increment(1);
165                }
166                Err(_) => {
167                    let event = BulkheadEvent::CallFailed {
168                        pattern_name: config.name.clone(),
169                        timestamp: Instant::now(),
170                        duration,
171                    };
172                    config.event_listeners.emit(&event);
173
174                    #[cfg(feature = "metrics")]
175                    counter!("bulkhead_calls_failed_total", "bulkhead" => config.name.clone())
176                        .increment(1);
177                }
178            }
179
180            #[cfg(feature = "metrics")]
181            {
182                let new_concurrent =
183                    config.max_concurrent_calls - semaphore_for_check.available_permits();
184                gauge!("bulkhead_concurrent_calls", "bulkhead" => config.name.clone())
185                    .set(new_concurrent as f64);
186            }
187
188            result
189        })
190    }
191}