#![cfg(feature = "grpc")]
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use typeway_core::endpoint::{DeleteEndpoint, GetEndpoint, PostEndpoint};
use typeway_core::path::{Capture, HCons, HNil, Lit, LitSegment};
use typeway_grpc::mapping::ToProtoType;
use typeway_server::*;
#[allow(non_camel_case_types)]
struct users;
impl LitSegment for users {
const VALUE: &'static str = "users";
}
type UsersPath = HCons<Lit<users>, HNil>;
type UserByIdPath = HCons<Lit<users>, HCons<Capture<u32>, HNil>>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct User {
id: u32,
name: String,
}
impl ToProtoType for User {
fn proto_type_name() -> &'static str {
"User"
}
fn is_message() -> bool {
true
}
fn message_definition() -> Option<String> {
Some("message User {\n uint32 id = 1;\n string name = 2;\n}".to_string())
}
}
#[derive(Debug, Deserialize)]
struct CreateUser {
name: String,
}
impl ToProtoType for CreateUser {
fn proto_type_name() -> &'static str {
"CreateUser"
}
fn is_message() -> bool {
true
}
fn message_definition() -> Option<String> {
Some("message CreateUser {\n string name = 1;\n}".to_string())
}
}
type AppState = Arc<std::sync::Mutex<Vec<User>>>;
async fn list_users(state: State<AppState>) -> Json<Vec<User>> {
Json(state.0.lock().unwrap().clone())
}
async fn get_user(
path: Path<UserByIdPath>,
state: State<AppState>,
) -> Result<Json<User>, http::StatusCode> {
let (id,) = path.0;
let all = state.0.lock().unwrap();
all.iter()
.find(|u| u.id == id)
.cloned()
.map(Json)
.ok_or(http::StatusCode::NOT_FOUND)
}
async fn create_user(
state: State<AppState>,
body: Json<CreateUser>,
) -> (http::StatusCode, Json<User>) {
let mut all = state.0.lock().unwrap();
let id = all.len() as u32 + 1;
let user = User {
id,
name: body.0.name,
};
all.push(user.clone());
(http::StatusCode::CREATED, Json(user))
}
async fn delete_user(path: Path<UserByIdPath>, state: State<AppState>) -> http::StatusCode {
let (id,) = path.0;
let mut all = state.0.lock().unwrap();
if let Some(pos) = all.iter().position(|u| u.id == id) {
all.remove(pos);
http::StatusCode::NO_CONTENT
} else {
http::StatusCode::NOT_FOUND
}
}
type TestAPI = (
GetEndpoint<UsersPath, Vec<User>>,
GetEndpoint<UserByIdPath, User>,
PostEndpoint<UsersPath, CreateUser, User>,
DeleteEndpoint<UserByIdPath, ()>,
);
async fn start_grpc_server() -> u16 {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![
User {
id: 1,
name: "Alice".into(),
},
User {
id: 2,
name: "Bob".into(),
},
]));
let grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
port
}
#[test]
fn server_with_grpc_compiles() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![]));
let _grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
}
#[tokio::test]
async fn grpc_server_serves_rest() {
let port = start_grpc_server().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user_list: Vec<User> = resp.json().await.unwrap();
assert_eq!(user_list.len(), 2);
assert_eq!(user_list[0].name, "Alice");
assert_eq!(user_list[1].name, "Bob");
}
#[tokio::test]
async fn grpc_server_serves_rest_with_captures() {
let port = start_grpc_server().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users/1"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user: User = resp.json().await.unwrap();
assert_eq!(user.id, 1);
assert_eq!(user.name, "Alice");
}
#[tokio::test]
async fn grpc_server_serves_grpc() {
let port = start_grpc_server().await;
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/ListUser"
))
.header("content-type", "application/grpc+json")
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let ct = resp
.headers()
.get("content-type")
.expect("missing content-type header")
.to_str()
.unwrap();
assert!(ct.starts_with("application/grpc"));
}
#[tokio::test]
async fn grpc_unknown_method_returns_unimplemented() {
let port = start_grpc_server().await;
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/UpdateUser"
))
.header("content-type", "application/grpc+json")
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn grpc_404_maps_to_not_found() {
let port = start_grpc_server().await;
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/GetUser"
))
.header("content-type", "application/grpc+json")
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let _resp_body = resp.bytes().await.unwrap();
}
#[tokio::test]
async fn grpc_server_with_state() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 42,
name: "StatefulUser".into(),
}]));
let grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user_list: Vec<User> = resp.json().await.unwrap();
assert_eq!(user_list.len(), 1);
assert_eq!(user_list[0].id, 42);
assert_eq!(user_list[0].name, "StatefulUser");
}
#[test]
fn grpc_server_exposes_service_descriptor() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![]));
let grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
let desc = grpc_server.service_descriptor();
assert_eq!(desc.name, "UserService");
assert_eq!(desc.package, "users.v1");
assert_eq!(desc.methods.len(), 4);
let names: Vec<&str> = desc.methods.iter().map(|m| m.name.as_str()).collect();
assert!(names.contains(&"ListUser"));
assert!(names.contains(&"GetUser"));
assert!(names.contains(&"CreateUser"));
assert!(names.contains(&"DeleteUser"));
}
#[tokio::test]
async fn grpc_test_client_list_users() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("users.v1.UserService", "ListUser").await;
assert!(resp.is_ok());
assert_eq!(resp.grpc_code(), typeway_grpc::GrpcCode::Ok);
let body = resp.json();
assert!(body.is_array());
let arr = body.as_array().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr[0]["name"], "Alice");
assert_eq!(arr[1]["name"], "Bob");
}
#[tokio::test]
async fn grpc_test_client_unknown_method() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_empty("users.v1.UserService", "DoesNotExist")
.await;
assert!(!resp.is_ok());
assert_eq!(resp.grpc_code(), typeway_grpc::GrpcCode::Unimplemented);
}
#[tokio::test]
async fn grpc_test_client_create_user() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call(
"users.v1.UserService",
"CreateUser",
serde_json::json!({"name": "Charlie"}),
)
.await;
assert!(resp.is_ok());
let body = resp.json();
assert_eq!(body["name"], "Charlie");
assert!(body["id"].as_u64().unwrap() > 0);
}
#[tokio::test]
async fn grpc_response_is_framed() {
let port = start_grpc_server().await;
let json_bytes = serde_json::to_vec(&serde_json::json!({})).unwrap();
let framed_req = typeway_grpc::framing::encode_grpc_frame(&json_bytes);
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/ListUser"
))
.header("content-type", "application/grpc+json")
.body(framed_req)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body_bytes = resp.bytes().await.unwrap();
assert!(body_bytes.len() >= 5, "response too short for gRPC frame");
assert_eq!(body_bytes[0], 0, "compression flag should be 0");
let declared_len =
u32::from_be_bytes([body_bytes[1], body_bytes[2], body_bytes[3], body_bytes[4]]) as usize;
assert_eq!(body_bytes.len(), 5 + declared_len, "frame length mismatch");
let unframed = typeway_grpc::framing::decode_grpc_frame(&body_bytes).unwrap();
let parsed: serde_json::Value = serde_json::from_slice(unframed).unwrap();
assert!(parsed.is_array());
}
#[tokio::test]
async fn grpc_timeout_header_propagated() {
let port = start_grpc_server().await;
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/ListUser"
))
.header("content-type", "application/grpc+json")
.header("grpc-timeout", "30S")
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let _resp_body = resp.bytes().await.unwrap();
}
#[tokio::test]
async fn grpc_timeout_exceeded_returns_deadline_exceeded() {
async fn slow_list_users(state: State<AppState>) -> Json<Vec<User>> {
tokio::time::sleep(Duration::from_millis(200)).await;
Json(state.0.lock().unwrap().clone())
}
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(slow_list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/ListUser"
))
.header("content-type", "application/grpc+json")
.header("grpc-timeout", "10m")
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let _resp_body = resp.bytes().await.unwrap();
}
#[test]
fn with_state_then_with_grpc() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![]));
let _grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("Svc", "pkg.v1");
}
#[tokio::test]
async fn grpc_e2e_list_users() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("users.v1.UserService", "ListUser").await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
assert_eq!(resp.grpc_code(), typeway_grpc::GrpcCode::Ok);
let body = resp.json();
assert!(body.is_array(), "expected JSON array, got {body}");
let arr = body.as_array().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr[0]["id"], 1);
assert_eq!(arr[0]["name"], "Alice");
assert_eq!(arr[1]["id"], 2);
assert_eq!(arr[1]["name"], "Bob");
}
#[tokio::test]
async fn grpc_e2e_create_user() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call(
"users.v1.UserService",
"CreateUser",
serde_json::json!({"name": "Dave"}),
)
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let body = resp.json();
assert_eq!(body["name"], "Dave");
assert!(
body["id"].as_u64().unwrap() > 0,
"expected positive id, got {}",
body["id"]
);
}
#[tokio::test]
async fn grpc_e2e_health_check() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("grpc.health.v1.Health", "Check").await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let body = resp.json();
assert_eq!(body["status"], "SERVING", "expected SERVING, got {body}");
}
#[tokio::test]
async fn grpc_e2e_reflection() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call(
"grpc.reflection.v1alpha.ServerReflection",
"ServerReflectionInfo",
serde_json::json!({"list_services": ""}),
)
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let body = resp.json();
let body_str = body.to_string();
assert!(
body_str.contains("UserService"),
"expected reflection response to contain 'UserService', got: {body_str}"
);
}
#[tokio::test]
async fn grpc_e2e_unknown_method() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_empty("users.v1.UserService", "NonExistentMethod")
.await;
assert!(!resp.is_ok());
assert_eq!(
resp.grpc_code(),
typeway_grpc::GrpcCode::Unimplemented,
"expected UNIMPLEMENTED, got {:?}",
resp.grpc_code()
);
}
#[tokio::test]
async fn grpc_e2e_rest_still_works() {
let port = start_grpc_server().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user_list: Vec<User> = resp.json().await.unwrap();
assert_eq!(user_list.len(), 2);
assert_eq!(user_list[0].name, "Alice");
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users/2"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user: User = resp.json().await.unwrap();
assert_eq!(user.id, 2);
assert_eq!(user.name, "Bob");
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{port}/users"))
.json(&serde_json::json!({"name": "Eve"}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 201);
let user: User = resp.json().await.unwrap();
assert_eq!(user.name, "Eve");
}
#[tokio::test]
async fn grpc_e2e_404_maps_correctly() {
let port = start_grpc_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("users.v1.UserService", "GetUser").await;
assert!(!resp.is_ok(), "expected gRPC error for empty GetUser call");
}
#[allow(non_camel_case_types)]
struct health;
impl LitSegment for health {
const VALUE: &'static str = "health";
}
type HealthPath = HCons<Lit<health>, HNil>;
async fn health_handler() -> String {
"ok".to_string()
}
#[tokio::test]
async fn grpc_with_effectful_server_and_cors() {
use typeway_core::effects::{CorsRequired, Requires};
use typeway_server::EffectfulServer;
type EffAPI = (
Requires<CorsRequired, GetEndpoint<UsersPath, Vec<User>>>,
GetEndpoint<HealthPath, String>,
);
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let server = EffectfulServer::<EffAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(health_handler),
))
.with_state(state)
.provide::<CorsRequired>()
.ready()
.with_grpc("EffService", "eff.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/health"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body = resp.text().await.unwrap();
assert!(
body.contains("ok"),
"expected body to contain 'ok', got: {body}"
);
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("eff.v1.EffService", "ListUser").await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let body = resp.json();
assert!(body.is_array());
assert_eq!(body.as_array().unwrap().len(), 1);
assert_eq!(body[0]["name"], "Alice");
}
#[tokio::test]
async fn grpc_with_validated_endpoint() {
use typeway_server::typed::Validate;
struct CreateUserValidator;
impl Validate<CreateUser> for CreateUserValidator {
fn validate(body: &CreateUser) -> Result<(), String> {
if body.name.is_empty() {
return Err("name must not be empty".to_string());
}
Ok(())
}
}
type ValidatedAPI = (
GetEndpoint<UsersPath, Vec<User>>,
typeway_server::typed::Validated<
CreateUserValidator,
PostEndpoint<UsersPath, CreateUser, User>,
>,
);
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let grpc_server =
Server::<ValidatedAPI>::new((bind::<_, _, _>(list_users), bind::<_, _, _>(create_user)))
.with_state(state)
.with_grpc("ValidatedService", "val.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_empty("val.v1.ValidatedService", "ListUser")
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let body = resp.json();
assert!(body.is_array());
assert_eq!(body.as_array().unwrap().len(), 1);
}
#[tokio::test]
async fn grpc_with_nested_wrappers() {
use typeway_core::effects::{CorsRequired, Requires};
use typeway_server::typed::Validate;
use typeway_server::EffectfulServer;
struct SimpleValidator;
impl Validate<CreateUser> for SimpleValidator {
fn validate(_body: &CreateUser) -> Result<(), String> {
Ok(())
}
}
type NestedAPI = (
GetEndpoint<UsersPath, Vec<User>>,
Requires<
CorsRequired,
typeway_server::typed::Validated<
SimpleValidator,
PostEndpoint<UsersPath, CreateUser, User>,
>,
>,
);
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let server = EffectfulServer::<NestedAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(create_user),
))
.with_state(state)
.provide::<CorsRequired>()
.ready()
.with_grpc("NestedService", "nested.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_empty("nested.v1.NestedService", "ListUser")
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
let resp = client
.call(
"nested.v1.NestedService",
"CreateUser",
serde_json::json!({"name": "Bob"}),
)
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
assert_eq!(resp.json()["name"], "Bob");
}
#[tokio::test]
async fn grpc_streaming_splits_json_array() {
use typeway_grpc::streaming::ServerStream;
type StreamingAPI = (
ServerStream<GetEndpoint<UsersPath, Vec<User>>>,
GetEndpoint<HealthPath, String>,
);
let state: AppState = Arc::new(std::sync::Mutex::new(vec![
User {
id: 1,
name: "Alice".into(),
},
User {
id: 2,
name: "Bob".into(),
},
User {
id: 3,
name: "Charlie".into(),
},
]));
let grpc_server =
Server::<StreamingAPI>::new((bind::<_, _, _>(list_users), bind::<_, _, _>(health_handler)))
.with_state(state)
.with_grpc("StreamService", "stream.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_streaming_empty("stream.v1.StreamService", "ListUser")
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
assert_eq!(
resp.len(),
3,
"expected 3 streamed items, got {}",
resp.len()
);
assert_eq!(resp.items[0]["name"], "Alice");
assert_eq!(resp.items[1]["name"], "Bob");
assert_eq!(resp.items[2]["name"], "Charlie");
}
#[tokio::test]
async fn grpc_non_streaming_returns_single_frame() {
use typeway_grpc::streaming::ServerStream;
type MixedStreamAPI = (
ServerStream<GetEndpoint<UsersPath, Vec<User>>>,
GetEndpoint<UserByIdPath, User>,
);
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let grpc_server =
Server::<MixedStreamAPI>::new((bind::<_, _, _>(list_users), bind::<_, _, _>(get_user)))
.with_state(state)
.with_grpc("MixedService", "mixed.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_streaming_empty("mixed.v1.MixedService", "ListUser")
.await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
assert_eq!(
resp.len(),
1,
"expected 1 streamed item (only 1 user in state), got {}",
resp.len()
);
assert_eq!(resp.items[0]["name"], "Alice");
let resp = client.call_empty("mixed.v1.MixedService", "GetUser").await;
assert!(!resp.is_ok(), "expected gRPC error for empty GetUser call");
}
#[test]
fn streaming_descriptor_has_server_streaming_flag() {
use typeway_grpc::service::ApiToServiceDescriptor;
use typeway_grpc::streaming::ServerStream;
type StreamAPI = (
ServerStream<GetEndpoint<UsersPath, Vec<User>>>,
GetEndpoint<HealthPath, String>,
);
let desc = StreamAPI::service_descriptor("StreamSvc", "stream.v1");
assert_eq!(desc.methods.len(), 2);
let list_method = desc.methods.iter().find(|m| m.name == "ListUser").unwrap();
assert!(
list_method.server_streaming,
"expected ListUser to be server_streaming"
);
assert!(
!list_method.client_streaming,
"expected ListUser to NOT be client_streaming"
);
let health_method = desc
.methods
.iter()
.find(|m| m.name == "ListHealth")
.unwrap();
assert!(
!health_method.server_streaming,
"expected GetHealth to NOT be server_streaming"
);
assert!(
!health_method.client_streaming,
"expected GetHealth to NOT be client_streaming"
);
}
async fn start_grpc_server_with_docs() -> u16 {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![
User {
id: 1,
name: "Alice".into(),
},
User {
id: 2,
name: "Bob".into(),
},
]));
let grpc_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
bind::<_, _, _>(delete_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1")
.with_grpc_docs();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
grpc_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
port
}
#[tokio::test]
async fn grpc_docs_endpoint_serves_html() {
let port = start_grpc_server_with_docs().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/grpc-docs"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let content_type = resp
.headers()
.get("content-type")
.expect("missing content-type")
.to_str()
.unwrap();
assert!(
content_type.contains("text/html"),
"expected text/html, got: {content_type}"
);
let body = resp.text().await.unwrap();
assert!(body.contains("<!DOCTYPE html>"), "expected HTML document");
assert!(
body.contains("UserService"),
"expected service name in HTML"
);
assert!(body.contains("ListUser"), "expected method name in HTML");
assert!(body.contains("GetUser"), "expected method name in HTML");
}
#[tokio::test]
async fn grpc_spec_endpoint_serves_json() {
let port = start_grpc_server_with_docs().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/grpc-spec"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let content_type = resp
.headers()
.get("content-type")
.expect("missing content-type")
.to_str()
.unwrap();
assert!(
content_type.contains("application/json"),
"expected application/json, got: {content_type}"
);
let spec: typeway_grpc::GrpcServiceSpec = resp.json().await.unwrap();
assert_eq!(spec.service.name, "UserService");
assert_eq!(spec.service.package, "users.v1");
assert_eq!(spec.service.full_name, "users.v1.UserService");
assert!(!spec.methods.is_empty(), "expected methods in spec");
assert!(spec.methods.contains_key("ListUser"));
assert!(spec.methods.contains_key("GetUser"));
assert!(spec.methods.contains_key("CreateUser"));
assert!(spec.methods.contains_key("DeleteUser"));
assert!(!spec.proto.is_empty(), "expected proto content in spec");
}
#[tokio::test]
async fn grpc_docs_not_served_without_with_grpc_docs() {
let port = start_grpc_server().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/grpc-docs"))
.await
.unwrap();
assert_eq!(resp.status(), 404);
let resp = reqwest::get(format!("http://127.0.0.1:{port}/grpc-spec"))
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn grpc_docs_does_not_break_rest_or_grpc() {
let port = start_grpc_server_with_docs().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let user_list: Vec<User> = resp.json().await.unwrap();
assert_eq!(user_list.len(), 2);
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client.call_empty("users.v1.UserService", "ListUser").await;
assert!(resp.is_ok(), "expected gRPC OK, got {:?}", resp.grpc_code());
}