mod algorithm;
mod layer;
mod service;
pub use algorithm::{Aimd, AimdBuilder, Algorithm, ConcurrencyAlgorithm, Vegas, VegasBuilder};
pub use layer::{AdaptiveLimiterLayer, AdaptiveLimiterLayerBuilder, IntoLayer};
pub use service::{AdaptiveError, AdaptiveFuture, AdaptiveService};
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tower::{Service, ServiceBuilder, ServiceExt};
#[tokio::test]
async fn test_basic_aimd() {
let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
let mut service = ServiceBuilder::new()
.layer(AdaptiveLimiterLayer::new(
Aimd::builder()
.initial_limit(10)
.latency_threshold(Duration::from_secs(1))
.build(),
))
.service(service);
let response = service.ready().await.unwrap().call(21).await.unwrap();
assert_eq!(response, 42);
}
#[tokio::test]
async fn test_limit_increases_on_fast_responses() {
let service = tower::service_fn(|_req: ()| async {
Ok::<_, &str>(())
});
let algorithm = Aimd::builder()
.initial_limit(10)
.increase_by(1)
.latency_threshold(Duration::from_secs(1))
.build();
let initial_limit = algorithm.limit();
let algorithm = Arc::new(algorithm);
let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
for _ in 0..5 {
service.ready().await.unwrap().call(()).await.unwrap();
}
assert!(algorithm.limit() > initial_limit);
}
#[tokio::test]
async fn test_limit_decreases_on_errors() {
let call_count = Arc::new(AtomicUsize::new(0));
let cc = Arc::clone(&call_count);
let service = tower::service_fn(move |_req: ()| {
let count = cc.fetch_add(1, Ordering::SeqCst);
async move {
if count < 5 {
Ok::<_, &str>(())
} else {
Err("error")
}
}
});
let algorithm = Aimd::builder()
.initial_limit(20)
.decrease_factor(0.5)
.latency_threshold(Duration::from_secs(1))
.build();
let algorithm = Arc::new(algorithm);
let mut service = AdaptiveService::new(service, Arc::clone(&algorithm));
for _ in 0..5 {
let _ = service.ready().await.unwrap().call(()).await;
}
let limit_before_error = algorithm.limit();
let _ = service.ready().await.unwrap().call(()).await;
assert!(algorithm.limit() < limit_before_error);
}
#[tokio::test]
async fn test_vegas_basic() {
let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req * 2) });
let mut service = ServiceBuilder::new()
.layer(AdaptiveLimiterLayer::new(
Vegas::builder().initial_limit(10).build(),
))
.service(service);
let response = service.ready().await.unwrap().call(21).await.unwrap();
assert_eq!(response, 42);
}
#[tokio::test]
async fn test_concurrent_requests() {
let service = tower::service_fn(|_req: ()| async {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok::<_, &str>(())
});
let service = ServiceBuilder::new()
.layer(AdaptiveLimiterLayer::new(
Aimd::builder()
.initial_limit(5)
.latency_threshold(Duration::from_secs(1))
.build(),
))
.service(service);
let mut handles = vec![];
for _ in 0..10 {
let mut svc = service.clone();
handles.push(tokio::spawn(async move {
svc.ready().await.unwrap().call(()).await.unwrap();
}));
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_algorithm_enum() {
let service = tower::service_fn(|req: i32| async move { Ok::<_, &str>(req) });
let algorithm = Algorithm::Aimd(Aimd::builder().initial_limit(10).build());
let mut service = ServiceBuilder::new()
.layer(AdaptiveLimiterLayer::new(algorithm))
.service(service);
let response = service.ready().await.unwrap().call(42).await.unwrap();
assert_eq!(response, 42);
}
}