mod common;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use axum::Router;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum_server::tls_rustls::RustlsConfig;
use common::*;
use noxy::http::HttpService;
use noxy::middleware::Retry;
use rcgen::{CertificateParams, KeyPair};
use tower::Layer;
#[tokio::test]
async fn retry_retries_on_503() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n < 2 {
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
} else {
"hello".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(3)
.backoff(Duration::from_millis(10));
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "hello");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_custom_policy_inspects_body() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n < 2 {
"error: temporary failure"
} else {
"success"
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::default().max_retries(3).policy(|resp, _attempt| {
if resp.body().starts_with(b"error") {
Some(Duration::from_millis(10))
} else {
None
}
});
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "success");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_policy_headers_checks_custom_header() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n < 2 {
(
[(http::header::HeaderName::from_static("x-retry"), "true")],
"not ready",
)
.into_response()
} else {
"ready".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::default()
.max_retries(3)
.policy_headers(|parts, _attempt| {
if parts.headers.get("x-retry").is_some() {
Some(Duration::from_millis(10))
} else {
None
}
});
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "ready");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_exhausts_max_retries_returns_last_response() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(2)
.backoff(Duration::from_millis(10));
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 503);
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_respects_retry_after_header() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
(
http::StatusCode::SERVICE_UNAVAILABLE,
[(http::header::RETRY_AFTER, "1")],
"unavailable",
)
.into_response()
} else {
"hello".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503]).max_retries(3);
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let start = std::time::Instant::now();
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
let elapsed = start.elapsed();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "hello");
assert!(
elapsed >= Duration::from_secs(1),
"should have respected Retry-After: 1, but took {elapsed:?}"
);
}
#[tokio::test]
async fn retry_skips_when_body_exceeds_replay_limit() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
post(move || {
let counter = counter_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(3)
.max_replay_body_bytes(4)
.backoff(Duration::from_millis(10));
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.post(format!("https://localhost:{}/", upstream_addr.port()))
.body("this body is larger than 4 bytes")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 503);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_retries_when_body_within_replay_limit() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
post(move |body: String| {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
assert_eq!(body, "hello");
if n < 2 {
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
} else {
"ok".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(3)
.max_replay_body_bytes(5)
.backoff(Duration::from_millis(10));
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.post(format!("https://localhost:{}/", upstream_addr.port()))
.body("hello")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "ok");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_retries_when_body_equals_replay_limit() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
post(move |body: String| {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
assert_eq!(body, "abcd");
if n == 0 {
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
} else {
"ok".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(2)
.max_replay_body_bytes(4)
.backoff(Duration::from_millis(10));
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.post(format!("https://localhost:{}/", upstream_addr.port()))
.body("abcd")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "ok");
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn retry_budget_allows_retries_under_limit() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n < 2 {
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
} else {
"hello".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(5)
.backoff(Duration::from_millis(10))
.budget_min_retries(0)
.budget(0.8);
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "hello");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_budget_blocks_when_exhausted() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(5)
.backoff(Duration::from_millis(10))
.budget_min_retries(0)
.budget(0.0);
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 503);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_budget_min_retries_floor() {
install_crypto_provider();
let key_pair = KeyPair::generate().unwrap();
let params = CertificateParams::new(vec!["localhost".to_string()]).unwrap();
let cert = params.self_signed(&key_pair).unwrap();
let cert_der = cert.der().to_vec();
let key_der = key_pair.serialized_der().to_vec();
let config = RustlsConfig::from_der(vec![cert_der], key_der)
.await
.unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
let app = Router::new().route(
"/",
get(move || {
let counter = counter_clone.clone();
async move {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n < 2 {
(http::StatusCode::SERVICE_UNAVAILABLE, "unavailable").into_response()
} else {
"hello".into_response()
}
}
}),
);
let handle = axum_server::Handle::new();
let listener_handle = handle.clone();
tokio::spawn(async move {
axum_server::bind_rustls("127.0.0.1:0".parse().unwrap(), config)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
});
let upstream_addr = listener_handle.listening().await.unwrap();
let proxy_addr = start_proxy(vec![Box::new(|inner: HttpService| {
let layer = Retry::on_statuses([503])
.max_retries(5)
.backoff(Duration::from_millis(10))
.budget_min_retries(10)
.budget(0.0);
tower::util::BoxService::new(layer.layer(inner))
})])
.await;
let client = http_client(proxy_addr);
let resp = client
.get(format!("https://localhost:{}/", upstream_addr.port()))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.unwrap(), "hello");
assert_eq!(counter.load(Ordering::SeqCst), 3);
}