use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arcly_http::core::engine::HttpMethod;
use arcly_http::http::Response;
use arcly_http::openapi::OpenApiInfo;
use arcly_http::prelude::*;
use arcly_http::web::interceptors::{Interceptor, NextHandler};
use futures::future::BoxFuture;
#[Module]
pub struct EmptyModule;
fn free_addr() -> String {
let l = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
let addr = l.local_addr().expect("local addr");
format!("127.0.0.1:{}", addr.port())
}
fn json_response(status: u16, body: &'static str) -> Response {
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(axum::body::Body::from(body))
.expect("static response")
}
async fn boot(plugins: Vec<Box<dyn ArclyPlugin>>, config: LaunchConfig) -> String {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test port");
let addr = listener.local_addr().expect("local addr").to_string();
let base = format!("http://{addr}");
let server = tokio::spawn(async move {
let _ = App::launch_on_listener::<EmptyModule>(
listener,
OpenApiInfo::new("itest", "0"),
plugins,
config,
)
.await;
});
for _ in 0..1500 {
if server.is_finished() {
panic!("test server failed during boot — plugin on_init/on_start error?");
}
if reqwest::get(format!("{base}/openapi.json")).await.is_ok() {
return base;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server did not become ready");
}
struct TagInterceptor;
impl Interceptor for TagInterceptor {
fn around(
&'static self,
ctx: arcly_http::RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
let mut resp = next.run(ctx).await;
resp.headers_mut()
.insert("x-itest-tag", "1".parse().expect("static"));
resp
})
}
}
struct BlockFilter;
impl BoundaryFilter for BlockFilter {
fn before_body(
&'static self,
parts: &axum::http::request::Parts,
) -> std::ops::ControlFlow<Response> {
if parts.headers.contains_key("x-deny") {
return std::ops::ControlFlow::Break(json_response(403, r#"{"e":"denied"}"#));
}
std::ops::ControlFlow::Continue(())
}
}
struct TestPlugin {
with_interceptor: bool,
with_filter: bool,
slow_route: bool,
}
impl Default for TestPlugin {
fn default() -> Self {
Self {
with_interceptor: true,
with_filter: true,
slow_route: false,
}
}
}
impl ArclyPlugin for TestPlugin {
fn name(&self) -> &'static str {
"itest-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/ping", |_ctx| async {
json_response(200, r#""pong""#)
});
if self.slow_route {
ctx.add_get("/itest/slow", |_ctx| async {
tokio::time::sleep(Duration::from_secs(2)).await;
json_response(200, r#""late""#)
});
}
if self.with_interceptor {
ctx.register_global_interceptor(Box::leak(Box::new(TagInterceptor)));
}
if self.with_filter {
ctx.register_boundary_filter(Box::leak(Box::new(BlockFilter)));
}
Ok(())
})
}
fn on_start<'a>(
&'a self,
container: &'static FrozenDiContainer,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
container.get::<DynamicRouteTable>().mount(
axum::http::Method::GET,
"/itest/dyn",
|_ctx| async { json_response(200, r#""dynamic""#) },
);
Ok(())
})
}
}
#[tokio::test(flavor = "multi_thread")]
async fn plugin_route_interceptor_and_request_id() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers().get("x-itest-tag").map(|v| v.as_bytes()),
Some(&b"1"[..])
);
assert!(resp.headers().contains_key("x-request-id"));
assert_eq!(resp.text().await.expect("body"), r#""pong""#);
}
#[tokio::test(flavor = "multi_thread")]
async fn inbound_request_id_is_honoured() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-request-id", "gateway-rid-42")
.send()
.await
.expect("send");
assert_eq!(
resp.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok()),
Some("gateway-rid-42")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn boundary_filter_rejects_before_body() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-deny", "1")
.send()
.await
.expect("send");
assert_eq!(resp.status(), 403);
assert!(resp.headers().contains_key("x-request-id"));
}
#[tokio::test(flavor = "multi_thread")]
async fn governor_deadline_returns_504() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig::default().request_timeout(Duration::from_millis(200)),
)
.await;
let resp = reqwest::get(format!("{base}/itest/slow"))
.await
.expect("slow");
assert_eq!(resp.status(), 504);
}
#[tokio::test(flavor = "multi_thread")]
async fn admission_cap_sheds_with_503() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig::default()
.max_in_flight(1)
.request_timeout(Duration::from_secs(5)),
)
.await;
let base2 = base.clone();
let hog = tokio::spawn(async move { reqwest::get(format!("{base2}/itest/slow")).await });
let mut shed = None;
for _ in 0..100 {
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
if resp.status() == 503 {
shed = Some(resp);
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let resp = shed.expect("cap must shed while the slot is occupied");
assert_eq!(
resp.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok()),
Some("1")
);
hog.abort();
}
#[tokio::test(flavor = "multi_thread")]
async fn cors_preflight_and_actual_request() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default().cors(CorsConfig::for_origins(["http://spa.test"])),
)
.await;
let client = reqwest::Client::new();
let pf = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("preflight");
assert_eq!(pf.status(), 204);
assert_eq!(
pf.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
assert_eq!(
pf.headers()
.get("access-control-allow-credentials")
.and_then(|v| v.to_str().ok()),
Some("true")
);
let bad = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://evil.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("bad preflight");
assert_eq!(bad.status(), 403);
let ok = client
.get(format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.send()
.await
.expect("actual");
assert_eq!(
ok.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn dynamic_routes_mount_and_unmount_live() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/_plugins/itest/dyn"))
.await
.expect("dyn");
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.expect("body"), r#""dynamic""#);
let missing = reqwest::get(format!("{base}/_plugins/nope"))
.await
.expect("missing");
assert_eq!(missing.status(), 404);
}
#[tokio::test(flavor = "multi_thread")]
async fn openapi_is_served_as_static_json() {
let base = boot(vec![], LaunchConfig::default()).await;
let resp = reqwest::get(format!("{base}/openapi.json"))
.await
.expect("spec");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/json")
);
let spec: serde_json::Value = resp.json().await.expect("valid json");
assert_eq!(spec["info"]["title"], "itest");
}
#[tokio::test(flavor = "multi_thread")]
async fn route_collision_fails_launch_loudly() {
struct Collider(&'static str);
impl ArclyPlugin for Collider {
fn name(&self) -> &'static str {
self.0
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_route(HttpMethod::GET, "/clash", |_ctx| async {
json_response(200, "{}")
});
Ok(())
})
}
}
let err = App::launch_configured::<EmptyModule>(
&free_addr(),
OpenApiInfo::new("collide", "0"),
vec![Box::new(Collider("first")), Box::new(Collider("second"))],
LaunchConfig::default(),
)
.await
.expect_err("duplicate plugin route must fail launch");
let msg = err.to_string();
assert!(
msg.contains("second"),
"error names the offending plugin: {msg}"
);
assert!(msg.contains("/clash"), "error names the path: {msg}");
}
#[tokio::test(flavor = "multi_thread")]
async fn provider_from_plugin_resolves_in_handler() {
static SEEN: AtomicUsize = AtomicUsize::new(0);
struct Counter(AtomicUsize);
struct ProviderPlugin;
impl ArclyPlugin for ProviderPlugin {
fn name(&self) -> &'static str {
"provider-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.provide(Counter(AtomicUsize::new(0)));
ctx.add_get("/counted", |rctx| async move {
let c = rctx.inject::<Counter>();
let n = c.0.fetch_add(1, Ordering::Relaxed) + 1;
SEEN.store(n, Ordering::Relaxed);
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(vec![Box::new(ProviderPlugin)], LaunchConfig::default()).await;
for _ in 0..3 {
assert_eq!(
reqwest::get(format!("{base}/counted"))
.await
.expect("req")
.status(),
200
);
}
assert_eq!(
SEEN.load(Ordering::Relaxed),
3,
"singleton state persisted across requests"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn handler_panic_becomes_500_with_request_id() {
struct PanicPlugin;
impl ArclyPlugin for PanicPlugin {
fn name(&self) -> &'static str {
"panic-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/panic", |_ctx| async {
if std::env::var("ITEST_NEVER").is_err() {
panic!("itest boom");
}
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(vec![Box::new(PanicPlugin)], LaunchConfig::default()).await;
let resp = reqwest::get(format!("{base}/itest/panic"))
.await
.expect("panic route must still answer");
assert_eq!(resp.status(), 500);
assert!(resp.headers().contains_key("x-request-id"));
let ok = reqwest::get(format!("{base}/openapi.json"))
.await
.expect("alive");
assert_eq!(ok.status(), 200);
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_body_is_413_not_truncated() {
struct EchoPlugin;
impl ArclyPlugin for EchoPlugin {
fn name(&self) -> &'static str {
"echo-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_route(HttpMethod::POST, "/itest/echo", |rctx| async move {
let body = format!(r#"{{"len":{}}}"#, rctx.body().len());
Response::builder()
.status(200)
.body(axum::body::Body::from(body))
.expect("echo response")
});
Ok(())
})
}
}
let base = boot(
vec![Box::new(EchoPlugin)],
LaunchConfig::default().max_body_bytes(64),
)
.await;
let client = reqwest::Client::new();
let small = client
.post(format!("{base}/itest/echo"))
.body(vec![b'a'; 32])
.send()
.await
.expect("small body");
assert_eq!(small.status(), 200);
assert_eq!(small.text().await.expect("body"), r#"{"len":32}"#);
let big = client
.post(format!("{base}/itest/echo"))
.body(vec![b'a'; 4096])
.send()
.await
.expect("big body");
assert_eq!(big.status(), 413);
}
#[tokio::test(flavor = "multi_thread")]
async fn readyz_flips_503_while_draining_healthz_stays_green() {
struct ProbePlugin;
impl ArclyPlugin for ProbePlugin {
fn name(&self) -> &'static str {
"probe-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get(
"/healthz",
arcly_http::observability::health::healthz_handler(),
);
ctx.add_get(
"/readyz",
arcly_http::observability::health::readyz_handler(),
);
Ok(())
})
}
}
let base = boot(vec![Box::new(ProbePlugin)], LaunchConfig::default()).await;
assert_eq!(
reqwest::get(format!("{base}/readyz"))
.await
.expect("ready")
.status(),
200
);
arcly_http::observability::health::set_draining(true);
let draining = reqwest::get(format!("{base}/readyz"))
.await
.expect("draining");
assert_eq!(draining.status(), 503);
assert_eq!(
draining.text().await.expect("body"),
r#"{"status":"draining"}"#
);
assert_eq!(
reqwest::get(format!("{base}/healthz"))
.await
.expect("live")
.status(),
200
);
arcly_http::observability::health::set_draining(false);
}
#[tokio::test(flavor = "multi_thread")]
async fn docs_can_be_disabled() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default().expose_docs(false),
)
.await;
assert_eq!(
reqwest::get(format!("{base}/docs"))
.await
.expect("docs")
.status(),
404
);
assert_eq!(
reqwest::get(format!("{base}/openapi.json"))
.await
.expect("spec")
.status(),
404
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_request_builds_production_shaped_context() {
use arcly_http::testing::TestRequest;
use arcly_http::web::tenant::{TenantConfig, TenantRegistry, TenantStrategy};
let ctx = TestRequest::post("/orders")
.query("expand=items")
.header("x-tenant-id", "acme")
.header(
"traceparent",
"00-0123456789abcdef0123456789abcdef-00f067aa0ba902b7-01",
)
.json(&serde_json::json!({"sku": "X-1", "qty": 2}))
.claims(serde_json::json!({"sub": "42", "role": "admin"}))
.provide(TenantRegistry::new(
TenantStrategy::header("x-tenant-id"),
vec![TenantConfig::new("acme", "Acme", "acme")],
None,
))
.build()
.await;
assert_eq!(ctx.path(), "/orders");
assert_eq!(ctx.query_string(), Some("expand=items"));
let body: serde_json::Value = serde_json::from_slice(ctx.body()).expect("json body");
assert_eq!(body["qty"], 2);
assert_eq!(
ctx.claims()
.and_then(|c| c.get("role"))
.and_then(|v| v.as_str()),
Some("admin")
);
assert_eq!(ctx.tenant().map(|t| t.id.as_str()), Some("acme"));
assert_eq!(ctx.trace_id_hex(), "0123456789abcdef0123456789abcdef");
}
#[tokio::test(flavor = "multi_thread")]
async fn test_server_boots_and_serves() {
use arcly_http::testing::TestServer;
let server = TestServer::launch::<EmptyModule>(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{}/itest/ping", server.base_url))
.await
.expect("ping via TestServer");
assert_eq!(resp.status(), 200);
}
pub struct TwinController;
#[Controller("/twin")]
impl TwinController {
#[Get("/", summary("Twin root"))]
async fn root() -> Json<serde_json::Value> {
Json(serde_json::json!({"ok": true}))
}
}
#[Module(controllers(TwinController))]
pub struct TwinModule;
#[tokio::test(flavor = "multi_thread")]
async fn prefixed_root_serves_both_slash_forms_and_canonical_spec() {
use arcly_http::testing::TestServer;
let server = TestServer::launch::<TwinModule>(vec![], LaunchConfig::default()).await;
for path in ["/twin", "/twin/"] {
let resp = reqwest::get(format!("{}{path}", server.base_url))
.await
.expect(path);
assert_eq!(resp.status(), 200, "{path} must be served");
}
let spec: serde_json::Value = reqwest::get(format!("{}/openapi.json", server.base_url))
.await
.expect("spec")
.json()
.await
.expect("json");
let paths = spec["paths"].as_object().expect("paths object");
assert!(paths.contains_key("/twin"), "canonical path in spec");
assert!(!paths.contains_key("/twin/"), "no trailing-slash duplicate");
}
#[tokio::test(flavor = "multi_thread")]
async fn adaptive_shedding_sheds_under_latency_pressure() {
struct LagPlugin;
impl ArclyPlugin for LagPlugin {
fn name(&self) -> &'static str {
"lag-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/lag", |_ctx| async {
tokio::time::sleep(Duration::from_millis(25)).await;
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(
vec![Box::new(LagPlugin)],
LaunchConfig::default().adaptive_shed_target(Duration::from_millis(2)),
)
.await;
let mut oks = 0;
let mut sheds = 0;
for _ in 0..40 {
let resp = reqwest::get(format!("{base}/itest/lag"))
.await
.expect("lag");
match resp.status().as_u16() {
200 => oks += 1,
503 => {
assert!(resp.headers().contains_key("retry-after"));
sheds += 1;
}
other => panic!("unexpected status {other}"),
}
}
assert!(oks >= 1, "the 90% cap must let probes through (oks={oks})");
assert!(
sheds >= 5,
"sustained overload must shed a meaningful slice (sheds={sheds}, oks={oks})"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn runtime_strings_flow_into_spec_and_tenant_strategy() {
use arcly_http::testing::TestRequest;
use arcly_http::web::tenant::{TenantConfig, TenantRegistry, TenantStrategy};
let service = std::env::var("ITEST_SERVICE").unwrap_or_else(|_| "runtime-svc".to_owned());
let spec = arcly_http::openapi::build_spec(
&OpenApiInfo::new(service.clone(), String::from("9.9.9"))
.server(format!("https://{service}.example.com"), "prod"),
);
assert_eq!(spec["info"]["title"], "runtime-svc");
assert_eq!(spec["servers"][0]["url"], "https://runtime-svc.example.com");
let header_name = String::from("x-tenant-id");
let ctx = TestRequest::get("/anything")
.header("x-tenant-id", "acme")
.provide(TenantRegistry::new(
TenantStrategy::header(header_name),
vec![TenantConfig::new("acme", "Acme", "acme")],
None,
))
.build()
.await;
assert_eq!(ctx.tenant().map(|t| t.id.as_str()), Some("acme"));
}