tower_resilience_adaptive/lib.rs
1//! Adaptive concurrency limiter for Tower services.
2//!
3//! This crate provides a Tower layer that dynamically adjusts concurrency limits
4//! based on observed latency and error rates, using algorithms like AIMD or Vegas.
5//!
6//! Unlike static concurrency limits which require manual tuning, adaptive limiters
7//! automatically find the optimal concurrency for your downstream services.
8//!
9//! # Algorithms
10//!
11//! ## AIMD (Additive Increase Multiplicative Decrease)
12//!
13//! The classic TCP congestion control algorithm:
14//! - On success with low latency: increase limit by a fixed amount
15//! - On failure or high latency: decrease limit by a factor (e.g., halve it)
16//!
17//! This creates a "sawtooth" pattern as it continuously probes for capacity.
18//!
19//! ## Vegas
20//!
21//! A more sophisticated algorithm that uses RTT measurements:
22//! - Estimates queue depth from RTT variations
23//! - Increases limit when queue is small (under-utilized)
24//! - Decreases limit when queue is large (congested)
25//!
26//! Vegas is more stable than AIMD and avoids the sawtooth pattern.
27//!
28//! # Example
29//!
30//! ```rust
31//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Aimd};
32//! use tower::{Service, ServiceBuilder, ServiceExt};
33//! use std::time::Duration;
34//!
35//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
36//! // Create a service
37//! let service = tower::service_fn(|req: String| async move {
38//! Ok::<_, std::convert::Infallible>(format!("Hello, {}!", req))
39//! });
40//!
41//! // Wrap with adaptive limiter using AIMD
42//! let mut service = ServiceBuilder::new()
43//! .layer(AdaptiveLimiterLayer::new(
44//! Aimd::builder()
45//! .initial_limit(10)
46//! .min_limit(1)
47//! .max_limit(100)
48//! .latency_threshold(Duration::from_millis(100))
49//! .build()
50//! ))
51//! .service(service);
52//!
53//! // The limit will automatically adjust based on response latency
54//! let response = service.ready().await?.call("World".to_string()).await?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! # Using Vegas Algorithm
60//!
61//! ```rust,no_run
62//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Vegas};
63//! use tower::ServiceBuilder;
64//!
65//! let layer = AdaptiveLimiterLayer::new(
66//! Vegas::builder()
67//! .initial_limit(10)
68//! .alpha(3) // Increase when queue < 3
69//! .beta(6) // Decrease when queue > 6
70//! .build()
71//! );
72//! ```
73//!
74//! # Combining with Other Patterns
75//!
76//! The adaptive limiter works well with other resilience patterns:
77//!
78//! ```rust,ignore
79//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Aimd};
80//! use tower_resilience_retry::RetryLayer;
81//! use tower_resilience_circuitbreaker::CircuitBreakerLayer;
82//! use tower::ServiceBuilder;
83//!
84//! let service = ServiceBuilder::new()
85//! // Outer: circuit breaker for catastrophic failures
86//! .layer(CircuitBreakerLayer::builder().build())
87//! // Middle: adaptive concurrency limiting
88//! .layer(AdaptiveLimiterLayer::new(Aimd::builder().build()))
89//! // Inner: retry transient failures
90//! .layer(RetryLayer::builder().max_attempts(3).build())
91//! .service(my_service);
92//! ```
93//!
94//! # Prior Art
95//!
96//! This implementation is inspired by:
97//! - [Netflix concurrency-limits](https://github.com/Netflix/concurrency-limits)
98//! - [Uber Cinnamon](https://www.uber.com/blog/cinnamon-auto-tuner-adaptive-concurrency-in-the-wild/)
99//! - [Vector Adaptive Request Concurrency](https://vector.dev/blog/adaptive-request-concurrency/)
100
101mod algorithm;
102mod layer;
103mod service;
104
105pub use algorithm::{Aimd, AimdBuilder, Algorithm, ConcurrencyAlgorithm, Vegas, VegasBuilder};
106pub use layer::{AdaptiveLimiterLayer, AdaptiveLimiterLayerBuilder, IntoLayer};
107pub use service::{AdaptiveError, AdaptiveFuture, AdaptiveService};
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use std::sync::atomic::{AtomicUsize, Ordering};
113 use std::sync::Arc;
114 use std::time::Duration;
115 use tower::{Service, ServiceBuilder, ServiceExt};
116
117 #[tokio::test]
118 async fn test_basic_aimd() {
119 let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
120
121 let mut service = ServiceBuilder::new()
122 .layer(AdaptiveLimiterLayer::new(
123 Aimd::builder()
124 .initial_limit(10)
125 .latency_threshold(Duration::from_secs(1))
126 .build(),
127 ))
128 .service(service);
129
130 let response = service.ready().await.unwrap().call(21).await.unwrap();
131 assert_eq!(response, 42);
132 }
133
134 #[tokio::test]
135 async fn test_limit_increases_on_fast_responses() {
136 let service = tower::service_fn(|_req: ()| async {
137 // Fast response
138 Ok::<_, &str>(())
139 });
140
141 let algorithm = Aimd::builder()
142 .initial_limit(10)
143 .increase_by(1)
144 .latency_threshold(Duration::from_secs(1))
145 .build();
146
147 let initial_limit = algorithm.limit();
148 let algorithm = Arc::new(algorithm);
149
150 let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
151
152 // Make several requests
153 for _ in 0..5 {
154 service.ready().await.unwrap().call(()).await.unwrap();
155 }
156
157 // Limit should have increased
158 assert!(algorithm.limit() > initial_limit);
159 }
160
161 #[tokio::test]
162 async fn test_limit_decreases_on_errors() {
163 let call_count = Arc::new(AtomicUsize::new(0));
164 let cc = Arc::clone(&call_count);
165
166 let service = tower::service_fn(move |_req: ()| {
167 let count = cc.fetch_add(1, Ordering::SeqCst);
168 async move {
169 if count < 5 {
170 Ok::<_, &str>(())
171 } else {
172 Err("error")
173 }
174 }
175 });
176
177 let algorithm = Aimd::builder()
178 .initial_limit(20)
179 .decrease_factor(0.5)
180 .latency_threshold(Duration::from_secs(1))
181 .build();
182
183 let algorithm = Arc::new(algorithm);
184 let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
185
186 // Make some successful requests
187 for _ in 0..5 {
188 let _ = service.ready().await.unwrap().call(()).await;
189 }
190
191 let limit_before_error = algorithm.limit();
192
193 // Make a failing request
194 let _ = service.ready().await.unwrap().call(()).await;
195
196 // Limit should have decreased
197 assert!(algorithm.limit() < limit_before_error);
198 }
199
200 #[tokio::test]
201 async fn test_vegas_basic() {
202 let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
203
204 let mut service = ServiceBuilder::new()
205 .layer(AdaptiveLimiterLayer::new(
206 Vegas::builder().initial_limit(10).build(),
207 ))
208 .service(service);
209
210 let response = service.ready().await.unwrap().call(21).await.unwrap();
211 assert_eq!(response, 42);
212 }
213
214 #[tokio::test]
215 async fn test_concurrent_requests() {
216 let service = tower::service_fn(|_req: ()| async {
217 tokio::time::sleep(Duration::from_millis(10)).await;
218 Ok::<_, &str>(())
219 });
220
221 let service = ServiceBuilder::new()
222 .layer(AdaptiveLimiterLayer::new(
223 Aimd::builder()
224 .initial_limit(5)
225 .latency_threshold(Duration::from_secs(1))
226 .build(),
227 ))
228 .service(service);
229
230 // Spawn concurrent requests
231 let mut handles = vec![];
232 for _ in 0..10 {
233 let mut svc = service.clone();
234 handles.push(tokio::spawn(async move {
235 svc.ready().await.unwrap().call(()).await.unwrap();
236 }));
237 }
238
239 // All should complete eventually
240 for handle in handles {
241 handle.await.unwrap();
242 }
243 }
244
245 #[tokio::test]
246 async fn test_algorithm_enum() {
247 let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req) });
248
249 // Test with Algorithm enum
250 let algorithm = Algorithm::Aimd(Aimd::builder().initial_limit(10).build());
251
252 let mut service = ServiceBuilder::new()
253 .layer(AdaptiveLimiterLayer::new(algorithm))
254 .service(service);
255
256 let response = service.ready().await.unwrap().call(42).await.unwrap();
257 assert_eq!(response, 42);
258 }
259}