use std::sync::Arc;
use axum::body::Body;
use axum::http::{header, Request, StatusCode};
use tower::ServiceExt;
use vgi_rpc::http::{HttpState, ARROW_CONTENT_TYPE};
use vgi_rpc::{MethodInfo, RpcServer};
fn state_with(
cors: Option<&str>,
prefix: Option<&str>,
compression: Option<i32>,
) -> Arc<HttpState> {
let mut srv = RpcServer::builder()
.server_id("it")
.protocol_name("Test")
.enable_describe(true)
.build();
srv.register(
MethodInfo::unary(
"echo_string",
arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"value",
arrow_schema::DataType::Utf8,
false,
)])
.into(),
arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"result",
arrow_schema::DataType::Utf8,
false,
)])
.into(),
|_req, _| Ok(None),
)
.doc("stub echo")
.param_type("value", "str"),
);
let mut b = HttpState::builder().server(Arc::new(srv));
if let Some(o) = cors {
b = b.cors_origins(o);
}
if let Some(p) = prefix {
b = b.prefix(p);
}
if let Some(l) = compression {
b = b.response_compression_level(l);
}
b.build()
}
#[tokio::test]
async fn health_endpoint_returns_ok() {
let state = state_with(None, None, None);
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn preflight_includes_cors_headers() {
let state = state_with(Some("https://app.example.com"), None, None);
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.method("OPTIONS")
.uri("/echo_string")
.header(header::ORIGIN, "https://app.example.com")
.header(header::ACCESS_CONTROL_REQUEST_METHOD, "POST")
.header(header::ACCESS_CONTROL_REQUEST_HEADERS, "content-type")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let allow = resp
.headers()
.get(header::ACCESS_CONTROL_ALLOW_ORIGIN)
.unwrap();
assert_eq!(allow, "https://app.example.com");
assert!(resp.headers().contains_key(header::ACCESS_CONTROL_MAX_AGE));
}
#[tokio::test]
async fn prefix_mounts_routes_under_path() {
let state = state_with(None, Some("/v1"), None);
let app = vgi_rpc::http::build_router(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(
Request::builder()
.uri("/v1/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(
resp.status() != StatusCode::OK,
"/v1/health must NOT be the health endpoint when prefix=/v1"
);
}
#[tokio::test]
async fn describe_html_page_served() {
let state = state_with(None, None, None);
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.uri("/describe")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp.headers().get(header::CONTENT_TYPE).unwrap();
assert!(ct.to_str().unwrap().starts_with("text/html"));
}
#[tokio::test]
async fn compression_emits_zstd_encoded_response_on_error() {
let state = state_with(None, None, Some(3));
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/echo_string")
.header(header::CONTENT_TYPE, ARROW_CONTENT_TYPE)
.header(header::ACCEPT_ENCODING, "zstd")
.body(Body::from(vec![]))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let ce = resp.headers().get(header::CONTENT_ENCODING);
assert_eq!(ce.map(|v| v.to_str().unwrap()), Some("zstd"));
}
fn state_configured(
f: impl FnOnce(vgi_rpc::http::HttpStateBuilder) -> vgi_rpc::http::HttpStateBuilder,
) -> Arc<HttpState> {
let mut srv = RpcServer::builder().server_id("it").build();
srv.register(MethodInfo::unary(
"echo_string",
arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"value",
arrow_schema::DataType::Utf8,
false,
)])
.into(),
arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"result",
arrow_schema::DataType::Utf8,
false,
)])
.into(),
|_req, _| Ok(None),
));
f(HttpState::builder().server(Arc::new(srv))).build()
}
#[tokio::test]
async fn oversize_request_body_is_rejected_by_body_limit() {
let state = state_configured(|b| b.max_body_size(16));
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/echo_string")
.header(header::CONTENT_TYPE, ARROW_CONTENT_TYPE)
.body(Body::from(vec![0u8; 4096]))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE);
}
#[tokio::test]
async fn small_response_under_soft_cap_is_not_rejected() {
let state = state_configured(|b| b.max_response_bytes(8));
let app = vgi_rpc::http::build_router(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/echo_string")
.header(header::CONTENT_TYPE, ARROW_CONTENT_TYPE)
.body(Body::from(vec![]))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}