use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use crw_core::Deadline;
fn tdl() -> Deadline {
Deadline::now_plus(Duration::from_secs(60))
}
use axum::Router;
use axum::extract::State;
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::get;
use crw_renderer::http_only::HttpFetcher;
use crw_renderer::traits::PageFetcher;
use flate2::Compression;
use flate2::write::GzEncoder;
use tokio::net::TcpListener;
async fn spawn_server() -> String {
let app = Router::new()
.route("/gzip", get(gzip_handler))
.route("/brotli", get(brotli_handler));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{}", addr)
}
fn gzip_bytes(body: &[u8]) -> Vec<u8> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body).unwrap();
encoder.finish().unwrap()
}
fn brotli_bytes(body: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
{
let mut writer = brotli::CompressorWriter::new(&mut out, 4096, 5, 22);
writer.write_all(body).unwrap();
}
out
}
async fn gzip_handler() -> impl axum::response::IntoResponse {
let body = gzip_bytes(b"<html><body><h1>gzip works</h1><p>decoded</p></body></html>");
(
StatusCode::OK,
[
(header::CONTENT_TYPE, "text/html; charset=utf-8"),
(header::CONTENT_ENCODING, "gzip"),
],
body,
)
}
async fn brotli_handler() -> impl axum::response::IntoResponse {
let body = brotli_bytes(b"<html><body><h1>brotli works</h1><p>decoded</p></body></html>");
(
StatusCode::OK,
[
(header::CONTENT_TYPE, "text/html; charset=utf-8"),
(header::CONTENT_ENCODING, "br"),
],
body,
)
}
#[tokio::test]
async fn http_fetcher_decodes_gzip_and_brotli_responses() {
let base_url = spawn_server().await;
let fetcher = HttpFetcher::new("crw-test", None, true);
let gzip_result = fetcher
.fetch(&format!("{base_url}/gzip"), &HashMap::new(), None, tdl())
.await
.unwrap();
assert!(gzip_result.html.contains("gzip works"));
assert!(!gzip_result.html.contains('\u{1f}'));
let brotli_result = fetcher
.fetch(&format!("{base_url}/brotli"), &HashMap::new(), None, tdl())
.await
.unwrap();
assert!(brotli_result.html.contains("brotli works"));
assert!(!brotli_result.html.contains('\u{1f}'));
}
async fn flaky_502_handler(
State(counter): State<Arc<AtomicU32>>,
) -> impl axum::response::IntoResponse {
let n = counter.fetch_add(1, Ordering::SeqCst);
if n == 0 {
(StatusCode::BAD_GATEWAY, "upstream").into_response()
} else {
(
StatusCode::OK,
[(header::CONTENT_TYPE, "text/html; charset=utf-8")],
"<html><body><h1>retry success</h1></body></html>",
)
.into_response()
}
}
async fn always_502_handler() -> impl axum::response::IntoResponse {
(StatusCode::BAD_GATEWAY, "still broken")
}
async fn always_500_handler() -> impl axum::response::IntoResponse {
(StatusCode::INTERNAL_SERVER_ERROR, "permanent")
}
async fn spawn_retry_server() -> (String, Arc<AtomicU32>) {
let counter = Arc::new(AtomicU32::new(0));
let app = Router::new()
.route("/flaky", get(flaky_502_handler))
.with_state(counter.clone())
.route("/always502", get(always_502_handler))
.route("/always500", get(always_500_handler));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
(format!("http://{}", addr), counter)
}
#[tokio::test]
async fn http_fetcher_retries_502_then_succeeds() {
let (base, counter) = spawn_retry_server().await;
let fetcher = HttpFetcher::new("crw-test", None, false);
let result = fetcher
.fetch(&format!("{base}/flaky"), &HashMap::new(), None, tdl())
.await
.expect("retry should succeed");
assert_eq!(result.status_code, 200);
assert!(result.html.contains("retry success"));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn http_fetcher_gives_up_after_one_retry_on_502() {
let (base, _counter) = spawn_retry_server().await;
let fetcher = HttpFetcher::new("crw-test", None, false);
let result = fetcher
.fetch(&format!("{base}/always502"), &HashMap::new(), None, tdl())
.await
.expect("non-2xx must be returned, not raised");
assert_eq!(result.status_code, 502);
}
#[tokio::test]
async fn http_fetcher_does_not_retry_500() {
let (base, _counter) = spawn_retry_server().await;
let fetcher = HttpFetcher::new("crw-test", None, false);
let result = fetcher
.fetch(&format!("{base}/always500"), &HashMap::new(), None, tdl())
.await
.expect("500 must surface as result");
assert_eq!(result.status_code, 500);
}
#[tokio::test]
async fn http_fetcher_retries_on_connect_refused() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let dead_addr = listener.local_addr().unwrap();
drop(listener);
let fetcher = HttpFetcher::new("crw-test", None, false);
let result = fetcher
.fetch(
&format!("http://{dead_addr}/x"),
&HashMap::new(),
None,
tdl(),
)
.await;
assert!(
result.is_err(),
"expected TargetUnreachable, got {result:?}"
);
}
async fn slow_handler() -> impl axum::response::IntoResponse {
tokio::time::sleep(Duration::from_secs(3)).await;
(StatusCode::OK, "<html>too late</html>")
}
async fn spawn_slow_server() -> String {
let app = Router::new().route("/slow", get(slow_handler));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("http://{}", addr)
}
#[tokio::test]
async fn http_fetcher_enforces_deadline_against_slow_origin() {
let base = spawn_slow_server().await;
let fetcher = HttpFetcher::new("crw-test", None, false);
let started = std::time::Instant::now();
let result = fetcher
.fetch(
&format!("{base}/slow"),
&HashMap::new(),
None,
Deadline::now_plus(Duration::from_millis(300)),
)
.await;
let elapsed = started.elapsed();
assert!(result.is_err(), "expected Timeout, got {result:?}");
assert!(
elapsed < Duration::from_millis(1500),
"deadline not enforced — took {elapsed:?} for a 300ms budget"
);
}