Skip to main content

tower_resilience_adaptive/
lib.rs

1//! Adaptive concurrency limiter for Tower services.
2//!
3//! This crate provides a Tower layer that dynamically adjusts concurrency limits
4//! based on observed latency and error rates, using algorithms like AIMD or Vegas.
5//!
6//! Unlike static concurrency limits which require manual tuning, adaptive limiters
7//! automatically find the optimal concurrency for your downstream services.
8//!
9//! # Algorithms
10//!
11//! ## AIMD (Additive Increase Multiplicative Decrease)
12//!
13//! The classic TCP congestion control algorithm:
14//! - On success with low latency: increase limit by a fixed amount
15//! - On failure or high latency: decrease limit by a factor (e.g., halve it)
16//!
17//! This creates a "sawtooth" pattern as it continuously probes for capacity.
18//!
19//! ## Vegas
20//!
21//! A more sophisticated algorithm that uses RTT measurements:
22//! - Estimates queue depth from RTT variations
23//! - Increases limit when queue is small (under-utilized)
24//! - Decreases limit when queue is large (congested)
25//!
26//! Vegas is more stable than AIMD and avoids the sawtooth pattern.
27//!
28//! # Example
29//!
30//! ```rust
31//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Aimd};
32//! use tower::{Service, ServiceBuilder, ServiceExt};
33//! use std::time::Duration;
34//!
35//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
36//! // Create a service
37//! let service = tower::service_fn(|req: String| async move {
38//!     Ok::<_, std::convert::Infallible>(format!("Hello, {}!", req))
39//! });
40//!
41//! // Wrap with adaptive limiter using AIMD
42//! let mut service = ServiceBuilder::new()
43//!     .layer(AdaptiveLimiterLayer::new(
44//!         Aimd::builder()
45//!             .initial_limit(10)
46//!             .min_limit(1)
47//!             .max_limit(100)
48//!             .latency_threshold(Duration::from_millis(100))
49//!             .build()
50//!     ))
51//!     .service(service);
52//!
53//! // The limit will automatically adjust based on response latency
54//! let response = service.ready().await?.call("World".to_string()).await?;
55//! # Ok(())
56//! # }
57//! ```
58//!
59//! # Using Vegas Algorithm
60//!
61//! ```rust,no_run
62//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Vegas};
63//! use tower::ServiceBuilder;
64//!
65//! let layer = AdaptiveLimiterLayer::new(
66//!     Vegas::builder()
67//!         .initial_limit(10)
68//!         .alpha(3)  // Increase when queue < 3
69//!         .beta(6)   // Decrease when queue > 6
70//!         .build()
71//! );
72//! ```
73//!
74//! # Combining with Other Patterns
75//!
76//! The adaptive limiter works well with other resilience patterns:
77//!
78//! ```rust,ignore
79//! use tower_resilience_adaptive::{AdaptiveLimiterLayer, Aimd};
80//! use tower_resilience_retry::RetryLayer;
81//! use tower_resilience_circuitbreaker::CircuitBreakerLayer;
82//! use tower::ServiceBuilder;
83//!
84//! let service = ServiceBuilder::new()
85//!     // Outer: circuit breaker for catastrophic failures
86//!     .layer(CircuitBreakerLayer::builder().build())
87//!     // Middle: adaptive concurrency limiting
88//!     .layer(AdaptiveLimiterLayer::new(Aimd::builder().build()))
89//!     // Inner: retry transient failures
90//!     .layer(RetryLayer::builder().max_attempts(3).build())
91//!     .service(my_service);
92//! ```
93//!
94//! # Prior Art
95//!
96//! This implementation is inspired by:
97//! - [Netflix concurrency-limits](https://github.com/Netflix/concurrency-limits)
98//! - [Uber Cinnamon](https://www.uber.com/blog/cinnamon-auto-tuner-adaptive-concurrency-in-the-wild/)
99//! - [Vector Adaptive Request Concurrency](https://vector.dev/blog/adaptive-request-concurrency/)
100
101mod algorithm;
102mod layer;
103mod service;
104
105pub use algorithm::{Aimd, AimdBuilder, Algorithm, ConcurrencyAlgorithm, Vegas, VegasBuilder};
106pub use layer::{AdaptiveLimiterLayer, AdaptiveLimiterLayerBuilder, IntoLayer};
107pub use service::{AdaptiveError, AdaptiveFuture, AdaptiveService};
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use std::sync::atomic::{AtomicUsize, Ordering};
113    use std::sync::Arc;
114    use std::time::Duration;
115    use tower::{Service, ServiceBuilder, ServiceExt};
116
117    #[tokio::test]
118    async fn test_basic_aimd() {
119        let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
120
121        let mut service = ServiceBuilder::new()
122            .layer(AdaptiveLimiterLayer::new(
123                Aimd::builder()
124                    .initial_limit(10)
125                    .latency_threshold(Duration::from_secs(1))
126                    .build(),
127            ))
128            .service(service);
129
130        let response = service.ready().await.unwrap().call(21).await.unwrap();
131        assert_eq!(response, 42);
132    }
133
134    #[tokio::test]
135    async fn test_limit_increases_on_fast_responses() {
136        let service = tower::service_fn(|_req: ()| async {
137            // Fast response
138            Ok::<_, &str>(())
139        });
140
141        let algorithm = Aimd::builder()
142            .initial_limit(10)
143            .increase_by(1)
144            .latency_threshold(Duration::from_secs(1))
145            .build();
146
147        let initial_limit = algorithm.limit();
148        let algorithm = Arc::new(algorithm);
149
150        let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
151
152        // Make several requests
153        for _ in 0..5 {
154            service.ready().await.unwrap().call(()).await.unwrap();
155        }
156
157        // Limit should have increased
158        assert!(algorithm.limit() > initial_limit);
159    }
160
161    #[tokio::test]
162    async fn test_limit_decreases_on_errors() {
163        let call_count = Arc::new(AtomicUsize::new(0));
164        let cc = Arc::clone(&call_count);
165
166        let service = tower::service_fn(move |_req: ()| {
167            let count = cc.fetch_add(1, Ordering::SeqCst);
168            async move {
169                if count < 5 {
170                    Ok::<_, &str>(())
171                } else {
172                    Err("error")
173                }
174            }
175        });
176
177        let algorithm = Aimd::builder()
178            .initial_limit(20)
179            .decrease_factor(0.5)
180            .latency_threshold(Duration::from_secs(1))
181            .build();
182
183        let algorithm = Arc::new(algorithm);
184        let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
185
186        // Make some successful requests
187        for _ in 0..5 {
188            let _ = service.ready().await.unwrap().call(()).await;
189        }
190
191        let limit_before_error = algorithm.limit();
192
193        // Make a failing request
194        let _ = service.ready().await.unwrap().call(()).await;
195
196        // Limit should have decreased
197        assert!(algorithm.limit() < limit_before_error);
198    }
199
200    #[tokio::test]
201    async fn test_vegas_basic() {
202        let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
203
204        let mut service = ServiceBuilder::new()
205            .layer(AdaptiveLimiterLayer::new(
206                Vegas::builder().initial_limit(10).build(),
207            ))
208            .service(service);
209
210        let response = service.ready().await.unwrap().call(21).await.unwrap();
211        assert_eq!(response, 42);
212    }
213
214    #[tokio::test]
215    async fn test_concurrent_requests() {
216        let service = tower::service_fn(|_req: ()| async {
217            tokio::time::sleep(Duration::from_millis(10)).await;
218            Ok::<_, &str>(())
219        });
220
221        let service = ServiceBuilder::new()
222            .layer(AdaptiveLimiterLayer::new(
223                Aimd::builder()
224                    .initial_limit(5)
225                    .latency_threshold(Duration::from_secs(1))
226                    .build(),
227            ))
228            .service(service);
229
230        // Spawn concurrent requests
231        let mut handles = vec![];
232        for _ in 0..10 {
233            let mut svc = service.clone();
234            handles.push(tokio::spawn(async move {
235                svc.ready().await.unwrap().call(()).await.unwrap();
236            }));
237        }
238
239        // All should complete eventually
240        for handle in handles {
241            handle.await.unwrap();
242        }
243    }
244
245    #[tokio::test]
246    async fn test_algorithm_enum() {
247        let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req) });
248
249        // Test with Algorithm enum
250        let algorithm = Algorithm::Aimd(Aimd::builder().initial_limit(10).build());
251
252        let mut service = ServiceBuilder::new()
253            .layer(AdaptiveLimiterLayer::new(algorithm))
254            .service(service);
255
256        let response = service.ready().await.unwrap().call(42).await.unwrap();
257        assert_eq!(response, 42);
258    }
259}