use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arcly_http::core::engine::HttpMethod;
use arcly_http::http::Response;
use arcly_http::openapi::OpenApiInfo;
use arcly_http::prelude::*;
use arcly_http::web::interceptors::{Interceptor, NextHandler};
use futures::future::BoxFuture;
#[Module]
pub struct EmptyModule;
fn free_addr() -> String {
let l = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
let addr = l.local_addr().expect("local addr");
format!("127.0.0.1:{}", addr.port())
}
fn json_response(status: u16, body: &'static str) -> Response {
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(axum::body::Body::from(body))
.expect("static response")
}
async fn boot(plugins: Vec<Box<dyn ArclyPlugin>>, config: LaunchConfig) -> String {
let addr = free_addr();
let base = format!("http://{addr}");
tokio::spawn(async move {
let _ = App::launch_configured::<EmptyModule>(
&addr,
OpenApiInfo {
title: "itest",
version: "0",
..Default::default()
},
plugins,
config,
)
.await;
});
for _ in 0..100 {
if reqwest::get(format!("{base}/openapi.json")).await.is_ok() {
return base;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server did not become ready");
}
struct TagInterceptor;
impl Interceptor for TagInterceptor {
fn around(
&'static self,
ctx: arcly_http::RequestContext,
next: NextHandler,
) -> BoxFuture<'static, Response> {
Box::pin(async move {
let mut resp = next.run(ctx).await;
resp.headers_mut()
.insert("x-itest-tag", "1".parse().expect("static"));
resp
})
}
}
struct BlockFilter;
impl BoundaryFilter for BlockFilter {
fn before_body(
&'static self,
parts: &axum::http::request::Parts,
) -> std::ops::ControlFlow<Response> {
if parts.headers.contains_key("x-deny") {
return std::ops::ControlFlow::Break(json_response(403, r#"{"e":"denied"}"#));
}
std::ops::ControlFlow::Continue(())
}
}
struct TestPlugin {
with_interceptor: bool,
with_filter: bool,
slow_route: bool,
}
impl Default for TestPlugin {
fn default() -> Self {
Self {
with_interceptor: true,
with_filter: true,
slow_route: false,
}
}
}
impl ArclyPlugin for TestPlugin {
fn name(&self) -> &'static str {
"itest-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_get("/itest/ping", |_ctx| async {
json_response(200, r#""pong""#)
});
if self.slow_route {
ctx.add_get("/itest/slow", |_ctx| async {
tokio::time::sleep(Duration::from_secs(2)).await;
json_response(200, r#""late""#)
});
}
if self.with_interceptor {
ctx.register_global_interceptor(Box::leak(Box::new(TagInterceptor)));
}
if self.with_filter {
ctx.register_boundary_filter(Box::leak(Box::new(BlockFilter)));
}
Ok(())
})
}
fn on_start<'a>(
&'a self,
container: &'static FrozenDiContainer,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
container.get::<DynamicRouteTable>().mount(
axum::http::Method::GET,
"/itest/dyn",
|_ctx| async { json_response(200, r#""dynamic""#) },
);
Ok(())
})
}
}
#[tokio::test(flavor = "multi_thread")]
async fn plugin_route_interceptor_and_request_id() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers().get("x-itest-tag").map(|v| v.as_bytes()),
Some(&b"1"[..])
);
assert!(resp.headers().contains_key("x-request-id"));
assert_eq!(resp.text().await.expect("body"), r#""pong""#);
}
#[tokio::test(flavor = "multi_thread")]
async fn inbound_request_id_is_honoured() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-request-id", "gateway-rid-42")
.send()
.await
.expect("send");
assert_eq!(
resp.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok()),
Some("gateway-rid-42")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn boundary_filter_rejects_before_body() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::Client::new()
.get(format!("{base}/itest/ping"))
.header("x-deny", "1")
.send()
.await
.expect("send");
assert_eq!(resp.status(), 403);
assert!(resp.headers().contains_key("x-request-id"));
}
#[tokio::test(flavor = "multi_thread")]
async fn governor_deadline_returns_504() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig {
request_timeout: Duration::from_millis(200),
..Default::default()
},
)
.await;
let resp = reqwest::get(format!("{base}/itest/slow"))
.await
.expect("slow");
assert_eq!(resp.status(), 504);
}
#[tokio::test(flavor = "multi_thread")]
async fn admission_cap_sheds_with_503() {
let base = boot(
vec![Box::new(TestPlugin {
slow_route: true,
..Default::default()
})],
LaunchConfig {
max_in_flight: 1,
request_timeout: Duration::from_secs(5),
..Default::default()
},
)
.await;
let base2 = base.clone();
let hog = tokio::spawn(async move { reqwest::get(format!("{base2}/itest/slow")).await });
tokio::time::sleep(Duration::from_millis(150)).await;
let resp = reqwest::get(format!("{base}/itest/ping"))
.await
.expect("ping");
assert_eq!(resp.status(), 503);
assert_eq!(
resp.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok()),
Some("1")
);
hog.abort();
}
#[tokio::test(flavor = "multi_thread")]
async fn cors_preflight_and_actual_request() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig {
cors: Some(CorsConfig::for_origins(["http://spa.test"])),
..Default::default()
},
)
.await;
let client = reqwest::Client::new();
let pf = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("preflight");
assert_eq!(pf.status(), 204);
assert_eq!(
pf.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
assert_eq!(
pf.headers()
.get("access-control-allow-credentials")
.and_then(|v| v.to_str().ok()),
Some("true")
);
let bad = client
.request(reqwest::Method::OPTIONS, format!("{base}/itest/ping"))
.header("origin", "http://evil.test")
.header("access-control-request-method", "GET")
.send()
.await
.expect("bad preflight");
assert_eq!(bad.status(), 403);
let ok = client
.get(format!("{base}/itest/ping"))
.header("origin", "http://spa.test")
.send()
.await
.expect("actual");
assert_eq!(
ok.headers()
.get("access-control-allow-origin")
.and_then(|v| v.to_str().ok()),
Some("http://spa.test")
);
}
#[tokio::test(flavor = "multi_thread")]
async fn dynamic_routes_mount_and_unmount_live() {
let base = boot(
vec![Box::new(TestPlugin::default())],
LaunchConfig::default(),
)
.await;
let resp = reqwest::get(format!("{base}/_plugins/itest/dyn"))
.await
.expect("dyn");
assert_eq!(resp.status(), 200);
assert_eq!(resp.text().await.expect("body"), r#""dynamic""#);
let missing = reqwest::get(format!("{base}/_plugins/nope"))
.await
.expect("missing");
assert_eq!(missing.status(), 404);
}
#[tokio::test(flavor = "multi_thread")]
async fn openapi_is_served_as_static_json() {
let base = boot(vec![], LaunchConfig::default()).await;
let resp = reqwest::get(format!("{base}/openapi.json"))
.await
.expect("spec");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/json")
);
let spec: serde_json::Value = resp.json().await.expect("valid json");
assert_eq!(spec["info"]["title"], "itest");
}
#[tokio::test(flavor = "multi_thread")]
async fn route_collision_fails_launch_loudly() {
struct Collider(&'static str);
impl ArclyPlugin for Collider {
fn name(&self) -> &'static str {
self.0
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.add_route(HttpMethod::GET, "/clash", |_ctx| async {
json_response(200, "{}")
});
Ok(())
})
}
}
let err = App::launch_configured::<EmptyModule>(
&free_addr(),
OpenApiInfo {
title: "collide",
version: "0",
..Default::default()
},
vec![Box::new(Collider("first")), Box::new(Collider("second"))],
LaunchConfig::default(),
)
.await
.expect_err("duplicate plugin route must fail launch");
let msg = err.to_string();
assert!(
msg.contains("second"),
"error names the offending plugin: {msg}"
);
assert!(msg.contains("/clash"), "error names the path: {msg}");
}
#[tokio::test(flavor = "multi_thread")]
async fn provider_from_plugin_resolves_in_handler() {
static SEEN: AtomicUsize = AtomicUsize::new(0);
struct Counter(AtomicUsize);
struct ProviderPlugin;
impl ArclyPlugin for ProviderPlugin {
fn name(&self) -> &'static str {
"provider-plugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
ctx.provide(Counter(AtomicUsize::new(0)));
ctx.add_get("/counted", |rctx| async move {
let c = rctx.inject::<Counter>();
let n = c.0.fetch_add(1, Ordering::Relaxed) + 1;
SEEN.store(n, Ordering::Relaxed);
json_response(200, "{}")
});
Ok(())
})
}
}
let base = boot(vec![Box::new(ProviderPlugin)], LaunchConfig::default()).await;
for _ in 0..3 {
assert_eq!(
reqwest::get(format!("{base}/counted"))
.await
.expect("req")
.status(),
200
);
}
assert_eq!(
SEEN.load(Ordering::Relaxed),
3,
"singleton state persisted across requests"
);
}