#![cfg(feature = "grpc")]
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use typeway_core::endpoint::{GetEndpoint, PostEndpoint};
use typeway_core::path::{Capture, HCons, HNil, Lit, LitSegment};
use typeway_grpc::mapping::ToProtoType;
use typeway_grpc::streaming::ServerStream;
use typeway_server::*;
#[allow(non_camel_case_types)]
struct __lit_users;
impl LitSegment for __lit_users {
const VALUE: &'static str = "users";
}
type UsersPath = HCons<Lit<__lit_users>, HNil>;
type UserByIdPath = HCons<Lit<__lit_users>, HCons<Capture<u32>, HNil>>;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct User {
id: u32,
name: String,
}
impl ToProtoType for User {
fn proto_type_name() -> &'static str {
"User"
}
fn is_message() -> bool {
true
}
fn message_definition() -> Option<String> {
Some("message User {\n uint32 id = 1;\n string name = 2;\n}".to_string())
}
}
#[derive(Debug, Deserialize)]
struct CreateUser {
name: String,
}
impl ToProtoType for CreateUser {
fn proto_type_name() -> &'static str {
"CreateUser"
}
fn is_message() -> bool {
true
}
fn message_definition() -> Option<String> {
Some("message CreateUser {\n string name = 1;\n}".to_string())
}
}
type AppState = Arc<std::sync::Mutex<Vec<User>>>;
async fn list_users(state: State<AppState>) -> Json<Vec<User>> {
Json(state.0.lock().unwrap().clone())
}
async fn get_user(
path: Path<UserByIdPath>,
state: State<AppState>,
) -> Result<Json<User>, http::StatusCode> {
let (id,) = path.0;
let all = state.0.lock().unwrap();
all.iter()
.find(|u| u.id == id)
.cloned()
.map(Json)
.ok_or(http::StatusCode::NOT_FOUND)
}
async fn create_user(
state: State<AppState>,
body: Json<CreateUser>,
) -> (http::StatusCode, Json<User>) {
let mut all = state.0.lock().unwrap();
let id = all.len() as u32 + 1;
let user = User {
id,
name: body.0.name,
};
all.push(user.clone());
(http::StatusCode::CREATED, Json(user))
}
type TestAPI = (
GetEndpoint<UsersPath, Vec<User>>,
GetEndpoint<UserByIdPath, User>,
PostEndpoint<UsersPath, CreateUser, User>,
);
async fn start_native_grpc_server() -> u16 {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![
User {
id: 1,
name: "Alice".into(),
},
User {
id: 2,
name: "Bob".into(),
},
]));
let native_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
native_server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
port
}
#[test]
fn native_grpc_server_compiles() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![]));
let _native_server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1");
}
#[tokio::test]
async fn native_serves_rest() {
let port = start_native_grpc_server().await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/users"))
.await
.unwrap();
assert_eq!(resp.status(), 200);
let users: Vec<User> = resp.json().await.unwrap();
assert_eq!(users.len(), 2);
assert_eq!(users[0].name, "Alice");
}
#[tokio::test]
async fn native_serves_grpc_json() {
let port = start_native_grpc_server().await;
let client = reqwest::Client::builder()
.http2_prior_knowledge()
.build()
.unwrap();
let body = typeway_grpc::framing::encode_grpc_frame(b"{}");
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/ListUser"
))
.header("content-type", "application/grpc+json")
.header("te", "trailers")
.body(body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body_bytes = resp.bytes().await.unwrap();
assert!(!body_bytes.is_empty(), "response body should not be empty");
}
#[tokio::test]
async fn native_unimplemented_method() {
let port = start_native_grpc_server().await;
let client = reqwest::Client::builder()
.http2_prior_knowledge()
.build()
.unwrap();
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/NonExistent"
))
.header("content-type", "application/grpc+json")
.header("te", "trailers")
.body(vec![])
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn native_health_check() {
let port = start_native_grpc_server().await;
let client = reqwest::Client::builder()
.http2_prior_knowledge()
.build()
.unwrap();
let body = typeway_grpc::framing::encode_grpc_frame(b"{}");
let resp = client
.post(format!(
"http://127.0.0.1:{port}/grpc.health.v1.Health/Check"
))
.header("content-type", "application/grpc+json")
.header("te", "trailers")
.body(body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let grpc_status = resp
.headers()
.get("grpc-status")
.and_then(|v| v.to_str().ok())
.unwrap_or("?");
assert_eq!(grpc_status, "0");
}
#[tokio::test]
async fn native_grpc_create_user() {
let port = start_native_grpc_server().await;
let client = reqwest::Client::builder()
.http2_prior_knowledge()
.build()
.unwrap();
let req_json = serde_json::json!({"name": "Charlie"});
let req_bytes = serde_json::to_vec(&req_json).unwrap();
let body = typeway_grpc::framing::encode_grpc_frame(&req_bytes);
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/CreateUser"
))
.header("content-type", "application/grpc+json")
.header("te", "trailers")
.body(body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body_bytes = resp.bytes().await.unwrap();
let unframed = typeway_grpc::framing::decode_grpc_frame(&body_bytes).unwrap();
let user: User = serde_json::from_slice(unframed).unwrap();
assert_eq!(user.name, "Charlie");
assert_eq!(user.id, 3); }
async fn list_users_streaming(state: State<AppState>) -> GrpcStream<User> {
let users = state.0.lock().unwrap().clone();
let (tx, stream) = GrpcStream::channel(8);
tokio::spawn(async move {
for user in users {
if tx.send(user).await.is_err() {
break;
}
}
});
stream
}
type StreamingAPI = (
ServerStream<GetEndpoint<UsersPath, Vec<User>>>,
PostEndpoint<UsersPath, CreateUser, User>,
);
async fn start_streaming_server() -> u16 {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![
User {
id: 1,
name: "Alice".into(),
},
User {
id: 2,
name: "Bob".into(),
},
User {
id: 3,
name: "Charlie".into(),
},
]));
let server = Server::<StreamingAPI>::new((
bind::<_, _, _>(list_users_streaming),
bind::<_, _, _>(create_user),
))
.with_state(state)
.with_grpc("StreamService", "stream.v1");
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
port
}
#[tokio::test]
async fn grpc_stream_returns_individual_frames() {
let port = start_streaming_server().await;
let client = typeway_grpc::GrpcTestClient::new(&format!("http://127.0.0.1:{port}"));
let resp = client
.call_streaming_empty("stream.v1.StreamService", "ListUser")
.await;
assert!(resp.is_ok(), "expected OK, got {:?}", resp.grpc_code());
assert_eq!(
resp.len(),
3,
"expected 3 streamed items, got {}",
resp.len()
);
assert_eq!(resp.items[0]["name"], "Alice");
assert_eq!(resp.items[1]["name"], "Bob");
assert_eq!(resp.items[2]["name"], "Charlie");
}
#[tokio::test]
async fn grpc_client_unary_call() {
let port = start_native_grpc_server().await;
let client = typeway_grpc::GrpcClient::new(
&format!("http://127.0.0.1:{port}"),
"UserService",
"users.v1",
)
.unwrap();
let resp = client
.call("CreateUser", &serde_json::json!({"name": "Dave"}))
.await
.unwrap();
assert_eq!(resp["name"], "Dave");
assert_eq!(resp["id"], 3);
}
#[tokio::test]
async fn grpc_client_server_stream_call() {
let port = start_streaming_server().await;
let client = typeway_grpc::GrpcClient::new(
&format!("http://127.0.0.1:{port}"),
"StreamService",
"stream.v1",
)
.unwrap();
let stream = client
.call_server_stream("ListUser", &serde_json::json!({}))
.await
.unwrap();
let items = stream.collect().await.unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0]["name"], "Alice");
assert_eq!(items[2]["name"], "Charlie");
}
#[tokio::test]
async fn grpc_client_unimplemented_error() {
let port = start_native_grpc_server().await;
let client = typeway_grpc::GrpcClient::new(
&format!("http://127.0.0.1:{port}"),
"UserService",
"users.v1",
)
.unwrap();
let result = client
.call("NonExistentMethod", &serde_json::json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
typeway_grpc::GrpcClientError::Status { code, .. } => {
assert_eq!(code, typeway_grpc::GrpcCode::Unimplemented);
}
other => panic!("expected Status error, got: {other}"),
}
}
#[test]
fn typeway_codec_adapter_with_derive() {
use typeway_grpc::{GrpcCodec, TypewayCodecAdapter};
use typeway_macros::TypewayCodec;
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize, TypewayCodec)]
struct TestUser {
#[proto(tag = 1)]
id: u32,
#[proto(tag = 2)]
name: String,
#[proto(tag = 3)]
active: bool,
}
let adapter = TypewayCodecAdapter::<TestUser>::new();
let json = serde_json::json!({"id": 42, "name": "Alice", "active": true});
let encoded = adapter.encode(&json).unwrap();
let decoded = adapter.decode(&encoded).unwrap();
assert_eq!(decoded["id"], 42);
assert_eq!(decoded["name"], "Alice");
assert_eq!(decoded["active"], true);
assert_eq!(adapter.content_type(), "application/grpc+proto");
}
#[cfg(feature = "grpc-proto-binary")]
#[tokio::test]
async fn binary_protobuf_content_type_detection() {
let state: AppState = Arc::new(std::sync::Mutex::new(vec![User {
id: 1,
name: "Alice".into(),
}]));
let server = Server::<TestAPI>::new((
bind::<_, _, _>(list_users),
bind::<_, _, _>(get_user),
bind::<_, _, _>(create_user),
))
.with_state(state)
.with_grpc("UserService", "users.v1")
.with_proto_binary();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
server
.serve_with_shutdown(listener, std::future::pending())
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::builder()
.http2_prior_knowledge()
.build()
.unwrap();
let json_body =
typeway_grpc::encode_grpc_frame(serde_json::json!({"name": "Dave"}).to_string().as_bytes());
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/CreateUser"
))
.header("content-type", "application/grpc+json")
.header("te", "trailers")
.body(json_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let grpc_status = resp
.headers()
.get("grpc-status")
.and_then(|v| v.to_str().ok())
.unwrap_or("?");
assert_eq!(grpc_status, "0", "JSON request should succeed");
let proto_bytes = vec![0x0A, 0x04, b'D', b'a', b'v', b'e']; let framed = typeway_grpc::encode_grpc_frame(&proto_bytes);
let resp = client
.post(format!(
"http://127.0.0.1:{port}/users.v1.UserService/CreateUser"
))
.header("content-type", "application/grpc") .header("te", "trailers")
.body(framed)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let grpc_status = resp
.headers()
.get("grpc-status")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(-1);
assert_ne!(grpc_status, -1, "grpc-status header should be present");
}