use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arcly_http::core::engine::HttpMethod;
use arcly_http::http::Response;
use arcly_http::openapi::OpenApiInfo;
use arcly_http::prelude::*;
use arcly_http::web::interceptors::{Interceptor, NextHandler};
use futures::future::BoxFuture;
#[Module]
pub struct EmptyModule;
fn free_addr() -> String {
let l = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
let addr = l.local_addr().expect("local addr");
format!("127.0.0.1:{}", addr.port())
}
fn json_response(status: u16, body: &'static str) -> Response {
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(axum::body::Body::from(body))
.expect("static response")
}
async fn boot(plugins: Vec<Box<dyn ArclyPlugin>>, config: LaunchConfig) -> String {
let addr = free_addr();
let base = format!("http://{addr}");
tokio::spawn(async move {
let _ = App::launch_configured::<EmptyModule>(
&addr,
OpenApiInfo {
title: "itest",
version: "0",
..Default::default()
},
plugins,
config,
)
.await;
});
for _ in 0..100 {
if reqwest::get(format!("{base}/openapi.json")).await.is_ok() {
return base;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server did not become ready");
}
struct TagInterceptor;
impl Interceptor for TagInterceptor {
fn around(
&'static self,
ctx: arcly_http::RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
let mut resp = next.run(ctx).await;
resp.headers_mut()
.insert("x-itest-tag", "1".parse().expect("static"));
resp
})
}
}
struct BlockFilter;
impl BoundaryFilter for BlockFilter {
fn before_body(
&'static self,
parts: &axum::http::request::Parts,
) -> std::ops::ControlFlow<Response> {
if parts.headers.contains_key("x-deny") {
return std::ops::ControlFlow::Break(json_response(403, r#"{"e":"denied"}"#));
}
std::ops::ControlFlow::Continue(())
}
}
struct TestPlugin {
with_interceptor: bool,
with_filter: bool,
slow_route: bool,
}
impl Default for TestPlugin {
fn default() -> Self {
Self {
with_interceptor: true,
with_filter: true,
slow_route: false,
}
}
}
impl ArclyPlugin for TestPlugin {
fn name(&self) -> &'static str {
"itest-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/ping", |_ctx| async {
json_response(200, r#""pong""#)
});
if self.slow_route {
ctx.add_get("/itest/slow", |_ctx| async {
tokio::time::sleep(Duration::from_secs(2)).await;
json_response(200, r#""late""#)
});
}
if self.with_interceptor {
ctx.register_global_interceptor(Box::leak(Box::new(TagInterceptor)));
}
if self.with_filter {
ctx.register_boundary_filter(Box::leak(Box::new(BlockFilter)));
}
Ok(())
})
}
fn on_start<'a>(
&'a self,
container: &'static FrozenDiContainer,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
container.get::<DynamicRouteTable>().mount(
axum::http::Method::GET,
"/itest/dyn",
|_ctx| async { json_response(200, r#""dynamic""#) },
);
Ok(())
})
}
}
#[tokio::test(flavor = "multi_thread")]
async fn plugin_route_interceptor_and_request_id() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers().get("x-itest-tag").map(|v| v.as_bytes()),
Some(&b"1"[..])
);
assert!(resp.headers().contains_key("x-request-id"));
assert_eq!(resp.text().await.expect("body"), r#""pong""#);
}
#[tokio::test(flavor = "multi_thread")]
async fn inbound_request_id_is_honoured() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-request-id", "gateway-rid-42")
.send()
.await
.expect("send");
assert_eq!(
resp.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok()),
Some("gateway-rid-42")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn boundary_filter_rejects_before_body() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-deny", "1")
.send()
.await
.expect("send");
assert_eq!(resp.status(), 403);
assert!(resp.headers().contains_key("x-request-id"));
}
#[tokio::test(flavor = "multi_thread")]
async fn governor_deadline_returns_504() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig {
request_timeout: Duration::from_millis(200),
..Default::default()
},
)
.await;
let resp = reqwest::get(format!("{base}/itest/slow"))
.await
.expect("slow");
assert_eq!(resp.status(), 504);
}
#[tokio::test(flavor = "multi_thread")]
async fn admission_cap_sheds_with_503() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig {
max_in_flight: 1,
request_timeout: Duration::from_secs(5),
..Default::default()
},
)
.await;
let base2 = base.clone();
let hog = tokio::spawn(async move { reqwest::get(format!("{base2}/itest/slow")).await });
tokio::time::sleep(Duration::from_millis(150)).await;
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
assert_eq!(resp.status(), 503);
assert_eq!(
resp.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok()),
Some("1")
);
hog.abort();
}
#[tokio::test(flavor = "multi_thread")]
async fn cors_preflight_and_actual_request() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig {
cors: Some(CorsConfig::for_origins(["http://spa.test"])),
..Default::default()
},
)
.await;
let client = reqwest::Client::new();
let pf = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("preflight");
assert_eq!(pf.status(), 204);
assert_eq!(
pf.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
assert_eq!(
pf.headers()
.get("access-control-allow-credentials")
.and_then(|v| v.to_str().ok()),
Some("true")
);
let bad = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://evil.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("bad preflight");
assert_eq!(bad.status(), 403);
let ok = client
.get(format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.send()
.await
.expect("actual");
assert_eq!(
ok.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn dynamic_routes_mount_and_unmount_live() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/_plugins/itest/dyn"))
.await
.expect("dyn");
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.expect("body"), r#""dynamic""#);
let missing = reqwest::get(format!("{base}/_plugins/nope"))
.await
.expect("missing");
assert_eq!(missing.status(), 404);
}
#[tokio::test(flavor = "multi_thread")]
async fn openapi_is_served_as_static_json() {
let base = boot(vec![], LaunchConfig::default()).await;
let resp = reqwest::get(format!("{base}/openapi.json"))
.await
.expect("spec");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/json")
);
let spec: serde_json::Value = resp.json().await.expect("valid json");
assert_eq!(spec["info"]["title"], "itest");
}
#[tokio::test(flavor = "multi_thread")]
async fn route_collision_fails_launch_loudly() {
struct Collider(&'static str);
impl ArclyPlugin for Collider {
fn name(&self) -> &'static str {
self.0
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_route(HttpMethod::GET, "/clash", |_ctx| async {
json_response(200, "{}")
});
Ok(())
})
}
}
let err = App::launch_configured::<EmptyModule>(
&free_addr(),
OpenApiInfo {
title: "collide",
version: "0",
..Default::default()
},
vec![Box::new(Collider("first")), Box::new(Collider("second"))],
LaunchConfig::default(),
)
.await
.expect_err("duplicate plugin route must fail launch");
let msg = err.to_string();
assert!(
msg.contains("second"),
"error names the offending plugin: {msg}"
);
assert!(msg.contains("/clash"), "error names the path: {msg}");
}
#[tokio::test(flavor = "multi_thread")]
async fn provider_from_plugin_resolves_in_handler() {
static SEEN: AtomicUsize = AtomicUsize::new(0);
struct Counter(AtomicUsize);
struct ProviderPlugin;
impl ArclyPlugin for ProviderPlugin {
fn name(&self) -> &'static str {
"provider-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.provide(Counter(AtomicUsize::new(0)));
ctx.add_get("/counted", |rctx| async move {
let c = rctx.inject::<Counter>();
let n = c.0.fetch_add(1, Ordering::Relaxed) + 1;
SEEN.store(n, Ordering::Relaxed);
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(vec![Box::new(ProviderPlugin)], LaunchConfig::default()).await;
for _ in 0..3 {
assert_eq!(
reqwest::get(format!("{base}/counted"))
.await
.expect("req")
.status(),
200
);
}
assert_eq!(
SEEN.load(Ordering::Relaxed),
3,
"singleton state persisted across requests"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn handler_panic_becomes_500_with_request_id() {
struct PanicPlugin;
impl ArclyPlugin for PanicPlugin {
fn name(&self) -> &'static str {
"panic-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/panic", |_ctx| async {
if std::env::var("ITEST_NEVER").is_err() {
panic!("itest boom");
}
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(vec![Box::new(PanicPlugin)], LaunchConfig::default()).await;
let resp = reqwest::get(format!("{base}/itest/panic"))
.await
.expect("panic route must still answer");
assert_eq!(resp.status(), 500);
assert!(resp.headers().contains_key("x-request-id"));
let ok = reqwest::get(format!("{base}/openapi.json"))
.await
.expect("alive");
assert_eq!(ok.status(), 200);
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_body_is_413_not_truncated() {
struct EchoPlugin;
impl ArclyPlugin for EchoPlugin {
fn name(&self) -> &'static str {
"echo-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_route(HttpMethod::POST, "/itest/echo", |rctx| async move {
let body = format!(r#"{{"len":{}}}"#, rctx.body().len());
Response::builder()
.status(200)
.body(axum::body::Body::from(body))
.expect("echo response")
});
Ok(())
})
}
}
let base = boot(
vec![Box::new(EchoPlugin)],
LaunchConfig {
max_body_bytes: 64,
..Default::default()
},
)
.await;
let client = reqwest::Client::new();
let small = client
.post(format!("{base}/itest/echo"))
.body(vec![b'a'; 32])
.send()
.await
.expect("small body");
assert_eq!(small.status(), 200);
assert_eq!(small.text().await.expect("body"), r#"{"len":32}"#);
let big = client
.post(format!("{base}/itest/echo"))
.body(vec![b'a'; 4096])
.send()
.await
.expect("big body");
assert_eq!(big.status(), 413);
}
#[tokio::test(flavor = "multi_thread")]
async fn readyz_flips_503_while_draining_healthz_stays_green() {
struct ProbePlugin;
impl ArclyPlugin for ProbePlugin {
fn name(&self) -> &'static str {
"probe-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get(
"/healthz",
arcly_http::observability::health::healthz_handler(),
);
ctx.add_get(
"/readyz",
arcly_http::observability::health::readyz_handler(),
);
Ok(())
})
}
}
let base = boot(vec![Box::new(ProbePlugin)], LaunchConfig::default()).await;
assert_eq!(
reqwest::get(format!("{base}/readyz"))
.await
.expect("ready")
.status(),
200
);
arcly_http::observability::health::set_draining(true);
let draining = reqwest::get(format!("{base}/readyz"))
.await
.expect("draining");
assert_eq!(draining.status(), 503);
assert_eq!(
draining.text().await.expect("body"),
r#"{"status":"draining"}"#
);
assert_eq!(
reqwest::get(format!("{base}/healthz"))
.await
.expect("live")
.status(),
200
);
arcly_http::observability::health::set_draining(false);
}
#[tokio::test(flavor = "multi_thread")]
async fn docs_can_be_disabled() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig {
expose_docs: false,
..Default::default()
},
)
.await;
assert_eq!(
reqwest::get(format!("{base}/docs"))
.await
.expect("docs")
.status(),
404
);
assert_eq!(
reqwest::get(format!("{base}/openapi.json"))
.await
.expect("spec")
.status(),
404
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_request_builds_production_shaped_context() {
use arcly_http::testing::TestRequest;
use arcly_http::web::tenant::{TenantConfig, TenantId, TenantRegistry, TenantStrategy};
let ctx = TestRequest::post("/orders")
.query("expand=items")
.header("x-tenant-id", "acme")
.header(
"traceparent",
"00-0123456789abcdef0123456789abcdef-00f067aa0ba902b7-01",
)
.json(&serde_json::json!({"sku": "X-1", "qty": 2}))
.claims(serde_json::json!({"sub": "42", "role": "admin"}))
.provide(TenantRegistry::new(
TenantStrategy::Header("x-tenant-id"),
vec![TenantConfig {
id: TenantId::new("acme"),
display_name: "Acme".into(),
datasource: "acme".into(),
}],
None,
))
.build()
.await;
assert_eq!(ctx.path(), "/orders");
assert_eq!(ctx.query_string(), Some("expand=items"));
let body: serde_json::Value = serde_json::from_slice(ctx.body()).expect("json body");
assert_eq!(body["qty"], 2);
assert_eq!(
ctx.claims()
.and_then(|c| c.get("role"))
.and_then(|v| v.as_str()),
Some("admin")
);
assert_eq!(ctx.tenant().map(|t| t.id.as_str()), Some("acme"));
assert_eq!(ctx.trace_id_hex(), "0123456789abcdef0123456789abcdef");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_server_boots_and_serves() {
use arcly_http::testing::TestServer;
let server = TestServer::launch::<EmptyModule>(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{}/itest/ping", server.base_url))
.await
.expect("ping via TestServer");
assert_eq!(resp.status(), 200);
}