use axum::body::Body;
use http::Request as HttpRequest;
use tower::ServiceExt;
use taut_rpc::ir::{HttpMethod, Primitive, ProcKind, TypeRef};
use taut_rpc::procedure::ProcedureBody;
use taut_rpc::ProcKindRuntime;
#[derive(serde::Serialize, serde::Deserialize, taut_rpc::Type, taut_rpc::Validate)]
#[allow(dead_code)]
struct TicksInput {
count: u64,
}
#[taut_rpc::rpc(stream)]
#[allow(clippy::unused_async)] async fn ticks(input: TicksInput) -> impl futures::Stream<Item = u64> + Send + 'static {
async_stream::stream! {
for i in 0..input.count {
yield i;
}
}
}
#[taut_rpc::rpc(stream)]
#[allow(clippy::unused_async)] async fn ticks_simple() -> impl futures::Stream<Item = u32> + Send + 'static {
async_stream::stream! {
for i in 0..3u32 {
yield i;
}
}
}
#[taut_rpc::rpc]
#[allow(clippy::unused_async)] async fn ping_unary() -> String {
"pong".to_string()
}
async fn body_string(response: axum::http::Response<Body>) -> String {
let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.expect("read response body");
String::from_utf8(bytes.to_vec()).expect("response body is utf-8")
}
#[test]
fn stream_macro_emits_subscription_descriptor() {
let desc = __taut_proc_ticks();
assert_eq!(desc.name, "ticks");
assert_eq!(desc.kind, ProcKindRuntime::Subscription);
assert_eq!(desc.ir.kind, ProcKind::Subscription);
assert_eq!(desc.ir.http_method, HttpMethod::Get);
assert!(
matches!(desc.body, ProcedureBody::Stream(_)),
"expected ProcedureBody::Stream, got something else"
);
}
#[test]
fn zero_input_subscription_descriptor() {
let desc = __taut_proc_ticks_simple();
assert_eq!(desc.name, "ticks_simple");
assert_eq!(desc.ir.kind, ProcKind::Subscription);
assert_eq!(desc.ir.http_method, HttpMethod::Get);
assert_eq!(
desc.ir.input,
TypeRef::Primitive(Primitive::Unit),
"zero-arg subscriptions must carry `Primitive(Unit)` as their input TypeRef",
);
}
#[tokio::test]
async fn subscription_route_emits_data_frames_then_end() {
let app = taut_rpc::Router::new()
.procedure(__taut_proc_ticks_simple())
.into_axum();
let response = app
.oneshot(
HttpRequest::builder()
.method("GET")
.uri("/rpc/ticks_simple?input=null")
.body(Body::empty())
.unwrap(),
)
.await
.expect("oneshot dispatch");
assert_eq!(
response.status(),
http::StatusCode::OK,
"subscription GET should return 200 OK before any frames flow",
);
let body = body_string(response).await;
assert!(
body.contains("event: data\ndata: 0\n\n"),
"expected `event: data\\ndata: 0\\n\\n` in body, got: {body}"
);
assert!(
body.contains("event: data\ndata: 1\n\n"),
"expected `event: data\\ndata: 1\\n\\n` in body, got: {body}"
);
assert!(
body.contains("event: data\ndata: 2\n\n"),
"expected `event: data\\ndata: 2\\n\\n` in body, got: {body}"
);
assert!(
body.contains("event: end"),
"expected `event: end` terminator in body, got: {body}"
);
}
#[tokio::test]
async fn subscription_input_decode_error_emits_error_frame() {
let app = taut_rpc::Router::new()
.procedure(__taut_proc_ticks())
.into_axum();
let response = app
.oneshot(
HttpRequest::builder()
.method("GET")
.uri("/rpc/ticks?input=not_json")
.body(Body::empty())
.unwrap(),
)
.await
.expect("oneshot dispatch");
let body = body_string(response).await;
assert!(
body.contains("event: error"),
"expected `event: error` in body, got: {body}"
);
assert!(
body.contains(r#""code":"decode_error""#),
"expected `\"code\":\"decode_error\"` in body, got: {body}"
);
}
#[tokio::test]
async fn subscription_with_no_query_param_decodes_as_null() {
let app = taut_rpc::Router::new()
.procedure(__taut_proc_ticks_simple())
.into_axum();
let response = app
.oneshot(
HttpRequest::builder()
.method("GET")
.uri("/rpc/ticks_simple")
.body(Body::empty())
.unwrap(),
)
.await
.expect("oneshot dispatch");
assert_eq!(response.status(), http::StatusCode::OK);
let body = body_string(response).await;
assert!(
body.contains("event: data\ndata: 0\n\n"),
"missing tick 0: {body}"
);
assert!(
body.contains("event: data\ndata: 1\n\n"),
"missing tick 1: {body}"
);
assert!(
body.contains("event: data\ndata: 2\n\n"),
"missing tick 2: {body}"
);
assert!(body.contains("event: end"), "missing end frame: {body}");
}
#[ignore = "no convenient way to construct a guaranteed-failing Serialize \
impl for a stream Item type — see comment above for context"]
#[tokio::test]
async fn subscription_serialization_failure_emits_error_frame() {
}
#[tokio::test]
async fn unary_and_stream_can_coexist_on_same_router() {
let app = taut_rpc::Router::new()
.procedure(__taut_proc_ping_unary())
.procedure(__taut_proc_ticks_simple())
.into_axum();
let unary = app
.clone()
.oneshot(
HttpRequest::builder()
.method("POST")
.uri("/rpc/ping_unary")
.header("content-type", "application/json")
.body(Body::from(r#"{"input":null}"#))
.unwrap(),
)
.await
.expect("unary oneshot dispatch");
assert_eq!(unary.status(), http::StatusCode::OK);
let unary_body = body_string(unary).await;
let unary_json: serde_json::Value = serde_json::from_str(&unary_body).expect("unary body json");
assert_eq!(
unary_json,
serde_json::json!({ "ok": "pong" }),
"unary procedure must serve `{{ \"ok\": <output> }}` over POST",
);
let streaming = app
.oneshot(
HttpRequest::builder()
.method("GET")
.uri("/rpc/ticks_simple")
.body(Body::empty())
.unwrap(),
)
.await
.expect("streaming oneshot dispatch");
assert_eq!(streaming.status(), http::StatusCode::OK);
let stream_body = body_string(streaming).await;
assert!(
stream_body.contains("event: data\ndata: 0\n\n"),
"missing tick 0: {stream_body}"
);
assert!(
stream_body.contains("event: end"),
"missing end frame: {stream_body}"
);
}
#[tokio::test]
async fn router_excludes_subscription_routes_from_post_dispatch() {
let app = taut_rpc::Router::new()
.procedure(__taut_proc_ticks_simple())
.into_axum();
let response = app
.oneshot(
HttpRequest::builder()
.method("POST")
.uri("/rpc/ticks_simple")
.header("content-type", "application/json")
.body(Body::from(r#"{"input":null}"#))
.unwrap(),
)
.await
.expect("oneshot dispatch");
let status = response.status();
assert!(
status == http::StatusCode::NOT_FOUND || status == http::StatusCode::METHOD_NOT_ALLOWED,
"POST to a subscription must return 404 or 405, got {status}",
);
}