use bytes::Bytes;
use futures_util::StreamExt;
use std::future::Future;
use std::pin::Pin;
use tonic::metadata::MetadataMap;
use super::streaming::MessageStream;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RpcMode {
Unary,
ServerStreaming,
ClientStreaming,
BidirectionalStreaming,
}
#[derive(Debug, Clone)]
pub struct GrpcRequestData {
pub service_name: String,
pub method_name: String,
pub payload: Bytes,
pub metadata: MetadataMap,
}
#[derive(Debug, Clone)]
pub struct GrpcResponseData {
pub payload: Bytes,
pub metadata: MetadataMap,
}
pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
pub trait GrpcHandler: Send + Sync {
fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send + '_>>;
fn service_name(&self) -> &str;
fn rpc_mode(&self) -> RpcMode {
RpcMode::Unary
}
fn call_server_stream(
&self,
request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send + '_>> {
let unary_future = self.call(request);
Box::pin(async move {
let response = unary_future.await?;
Ok(crate::grpc::streaming::single_message_stream(response.payload))
})
}
fn call_client_stream(
&self,
request: crate::grpc::streaming::StreamingRequest,
) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send + '_>> {
Box::pin(async move {
let crate::grpc::streaming::StreamingRequest {
service_name,
method_name,
mut message_stream,
metadata,
} = request;
let first_message = match message_stream.next().await {
Some(Ok(message)) => message,
Some(Err(status)) => return Err(status),
None => {
return Err(tonic::Status::invalid_argument(
"Client stream is empty; unary fallback requires exactly one request message",
));
}
};
if let Some(next_message) = message_stream.next().await {
match next_message {
Ok(_) => {
return Err(tonic::Status::invalid_argument(
"Unary fallback requires exactly one request message",
));
}
Err(status) => return Err(status),
}
}
self.call(GrpcRequestData {
service_name,
method_name,
payload: first_message,
metadata,
})
.await
})
}
fn call_bidi_stream(
&self,
request: crate::grpc::streaming::StreamingRequest,
) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send + '_>> {
Box::pin(async move {
let response = self.call_client_stream(request).await?;
Ok(crate::grpc::streaming::single_message_stream(response.payload))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
struct TestGrpcHandler;
impl GrpcHandler for TestGrpcHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::from("test response"),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.TestService"
}
}
#[tokio::test]
async fn test_grpc_handler_basic_call() {
let handler = TestGrpcHandler;
let request = GrpcRequestData {
service_name: "test.TestService".to_string(),
method_name: "TestMethod".to_string(),
payload: Bytes::from("test payload"),
metadata: MetadataMap::new(),
};
let result = handler.call(request).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.payload, Bytes::from("test response"));
}
#[test]
fn test_grpc_handler_service_name() {
let handler = TestGrpcHandler;
assert_eq!(handler.service_name(), "test.TestService");
}
#[test]
fn test_grpc_handler_default_rpc_mode() {
let handler = TestGrpcHandler;
assert_eq!(handler.rpc_mode(), RpcMode::Unary);
}
#[tokio::test]
async fn test_grpc_handler_default_server_stream_falls_back_to_unary() {
let handler = TestGrpcHandler;
let request = GrpcRequestData {
service_name: "test.TestService".to_string(),
method_name: "StreamMethod".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await.unwrap();
let collected: Vec<_> = result.collect().await;
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
}
#[tokio::test]
async fn test_grpc_handler_default_client_stream_falls_back_to_unary() {
let handler = TestGrpcHandler;
let request = crate::grpc::streaming::StreamingRequest {
service_name: "test.TestService".to_string(),
method_name: "ClientStreamMethod".to_string(),
message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("one")]),
metadata: MetadataMap::new(),
};
let result = handler.call_client_stream(request).await.unwrap();
assert_eq!(result.payload, Bytes::from("test response"));
}
#[tokio::test]
async fn test_grpc_handler_default_client_stream_requires_single_message() {
let handler = TestGrpcHandler;
let request = crate::grpc::streaming::StreamingRequest {
service_name: "test.TestService".to_string(),
method_name: "ClientStreamMethod".to_string(),
message_stream: crate::grpc::streaming::message_stream_from_vec(vec![
Bytes::from("one"),
Bytes::from("two"),
]),
metadata: MetadataMap::new(),
};
let error = handler.call_client_stream(request).await.unwrap_err();
assert_eq!(error.code(), tonic::Code::InvalidArgument);
assert!(error.message().contains("exactly one"));
}
#[tokio::test]
async fn test_grpc_handler_default_bidi_stream_falls_back_to_unary() {
let handler = TestGrpcHandler;
let request = crate::grpc::streaming::StreamingRequest {
service_name: "test.TestService".to_string(),
method_name: "BidiMethod".to_string(),
message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("ping")]),
metadata: MetadataMap::new(),
};
let stream = handler.call_bidi_stream(request).await.unwrap();
let collected: Vec<_> = stream.collect().await;
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
}
#[test]
fn test_grpc_request_data_creation() {
let request = GrpcRequestData {
service_name: "mypackage.MyService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from("payload"),
metadata: MetadataMap::new(),
};
assert_eq!(request.service_name, "mypackage.MyService");
assert_eq!(request.method_name, "GetUser");
assert_eq!(request.payload, Bytes::from("payload"));
}
#[test]
fn test_grpc_response_data_creation() {
let response = GrpcResponseData {
payload: Bytes::from("response"),
metadata: MetadataMap::new(),
};
assert_eq!(response.payload, Bytes::from("response"));
assert!(response.metadata.is_empty());
}
#[test]
fn test_grpc_request_data_clone() {
let original = GrpcRequestData {
service_name: "test.Service".to_string(),
method_name: "Method".to_string(),
payload: Bytes::from("data"),
metadata: MetadataMap::new(),
};
let cloned = original.clone();
assert_eq!(original.service_name, cloned.service_name);
assert_eq!(original.method_name, cloned.method_name);
assert_eq!(original.payload, cloned.payload);
}
#[test]
fn test_grpc_response_data_clone() {
let original = GrpcResponseData {
payload: Bytes::from("response data"),
metadata: MetadataMap::new(),
};
let cloned = original.clone();
assert_eq!(original.payload, cloned.payload);
}
#[tokio::test]
async fn test_grpc_handler_error_response() {
struct ErrorHandler;
impl GrpcHandler for ErrorHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async { Err(tonic::Status::not_found("Resource not found")) })
}
fn service_name(&self) -> &str {
"test.ErrorService"
}
}
let handler = ErrorHandler;
let request = GrpcRequestData {
service_name: "test.ErrorService".to_string(),
method_name: "ErrorMethod".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call(request).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert_eq!(error.code(), tonic::Code::NotFound);
assert_eq!(error.message(), "Resource not found");
}
#[tokio::test]
async fn test_server_stream_with_multiple_messages() {
use futures_util::StreamExt;
struct ServerStreamHandler;
impl GrpcHandler for ServerStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::from("unary response"),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.StreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async {
let messages = vec![
Bytes::from("message1"),
Bytes::from("message2"),
Bytes::from("message3"),
];
Ok(super::super::streaming::message_stream_from_vec(messages))
})
}
}
let handler = ServerStreamHandler;
let request = GrpcRequestData {
service_name: "test.StreamService".to_string(),
method_name: "StreamMethod".to_string(),
payload: Bytes::from("request data"),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let msg1 = stream.next().await.unwrap().unwrap();
assert_eq!(msg1, Bytes::from("message1"));
let msg2 = stream.next().await.unwrap().unwrap();
assert_eq!(msg2, Bytes::from("message2"));
let msg3 = stream.next().await.unwrap().unwrap();
assert_eq!(msg3, Bytes::from("message3"));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_server_stream_empty_stream() {
use futures_util::StreamExt;
struct EmptyStreamHandler;
impl GrpcHandler for EmptyStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.EmptyStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async { Ok(super::super::streaming::empty_message_stream()) })
}
}
let handler = EmptyStreamHandler;
let request = GrpcRequestData {
service_name: "test.EmptyStreamService".to_string(),
method_name: "EmptyStream".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_server_stream_with_error_mid_stream() {
use futures_util::StreamExt;
struct ErrorMidStreamHandler;
impl GrpcHandler for ErrorMidStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.ErrorMidStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async {
let messages = vec![
Ok(Bytes::from("message1")),
Ok(Bytes::from("message2")),
Err(tonic::Status::internal("Stream error")),
];
let stream: MessageStream = Box::pin(futures_util::stream::iter(messages));
Ok(stream)
})
}
}
let handler = ErrorMidStreamHandler;
let request = GrpcRequestData {
service_name: "test.ErrorMidStreamService".to_string(),
method_name: "ErrorStream".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let msg1 = stream.next().await.unwrap().unwrap();
assert_eq!(msg1, Bytes::from("message1"));
let msg2 = stream.next().await.unwrap().unwrap();
assert_eq!(msg2, Bytes::from("message2"));
let error_result = stream.next().await.unwrap();
assert!(error_result.is_err());
let error = error_result.unwrap_err();
assert_eq!(error.code(), tonic::Code::Internal);
assert_eq!(error.message(), "Stream error");
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_server_stream_returns_error() {
struct FailingStreamHandler;
impl GrpcHandler for FailingStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.FailingStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async { Err(tonic::Status::unavailable("Stream unavailable")) })
}
}
let handler = FailingStreamHandler;
let request = GrpcRequestData {
service_name: "test.FailingStreamService".to_string(),
method_name: "FailingStream".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_err());
if let Err(error) = result {
assert_eq!(error.code(), tonic::Code::Unavailable);
assert_eq!(error.message(), "Stream unavailable");
} else {
panic!("Expected error");
}
}
#[tokio::test]
async fn test_server_stream_with_metadata() {
use futures_util::StreamExt;
struct MetadataStreamHandler;
impl GrpcHandler for MetadataStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.MetadataStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async move {
assert!(!request.metadata.is_empty());
let messages = vec![Bytes::from("metadata_message")];
Ok(super::super::streaming::message_stream_from_vec(messages))
})
}
}
let handler = MetadataStreamHandler;
let mut metadata = MetadataMap::new();
metadata.insert(
"x-request-id",
"test-request-123"
.parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
.unwrap(),
);
let request = GrpcRequestData {
service_name: "test.MetadataStreamService".to_string(),
method_name: "MetadataStream".to_string(),
payload: Bytes::new(),
metadata,
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from("metadata_message"));
}
#[tokio::test]
async fn test_server_stream_large_stream_100_messages() {
use futures_util::StreamExt;
struct LargeStreamHandler;
impl GrpcHandler for LargeStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.LargeStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async {
let mut messages = Vec::new();
for i in 0..100 {
messages.push(Bytes::from(format!("message_{}", i)));
}
Ok(super::super::streaming::message_stream_from_vec(messages))
})
}
}
let handler = LargeStreamHandler;
let request = GrpcRequestData {
service_name: "test.LargeStreamService".to_string(),
method_name: "LargeStream".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
for i in 0..100 {
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from(format!("message_{}", i)));
}
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_server_stream_large_stream_500_messages() {
use futures_util::StreamExt;
struct VeryLargeStreamHandler;
impl GrpcHandler for VeryLargeStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.VeryLargeStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async {
let mut messages = Vec::new();
for i in 0..500 {
messages.push(Bytes::from(format!("msg_{}", i)));
}
Ok(super::super::streaming::message_stream_from_vec(messages))
})
}
}
let handler = VeryLargeStreamHandler;
let request = GrpcRequestData {
service_name: "test.VeryLargeStreamService".to_string(),
method_name: "VeryLargeStream".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let mut count = 0;
while let Some(item) = stream.next().await {
let msg = item.unwrap();
assert_eq!(msg, Bytes::from(format!("msg_{}", count)));
count += 1;
}
assert_eq!(count, 500);
}
#[test]
fn test_rpc_mode_unary() {
let handler = TestGrpcHandler;
assert_eq!(handler.rpc_mode(), RpcMode::Unary);
}
#[test]
fn test_rpc_mode_server_streaming() {
struct ServerStreamTestHandler;
impl GrpcHandler for ServerStreamTestHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.ServerStreamTestService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
}
let handler = ServerStreamTestHandler;
assert_eq!(handler.rpc_mode(), RpcMode::ServerStreaming);
}
#[test]
fn test_rpc_mode_client_streaming() {
struct ClientStreamTestHandler;
impl GrpcHandler for ClientStreamTestHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.ClientStreamTestService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ClientStreaming
}
}
let handler = ClientStreamTestHandler;
assert_eq!(handler.rpc_mode(), RpcMode::ClientStreaming);
}
#[test]
fn test_rpc_mode_bidirectional_streaming() {
struct BiDirectionalStreamTestHandler;
impl GrpcHandler for BiDirectionalStreamTestHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.BiDirectionalStreamTestService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::BidirectionalStreaming
}
}
let handler = BiDirectionalStreamTestHandler;
assert_eq!(handler.rpc_mode(), RpcMode::BidirectionalStreaming);
}
#[tokio::test]
async fn test_server_stream_single_message() {
use futures_util::StreamExt;
struct SingleMessageStreamHandler;
impl GrpcHandler for SingleMessageStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.SingleMessageStreamService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async {
Ok(super::super::streaming::single_message_stream(Bytes::from(
"single_msg",
)))
})
}
}
let handler = SingleMessageStreamHandler;
let request = GrpcRequestData {
service_name: "test.SingleMessageStreamService".to_string(),
method_name: "SingleMessage".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from("single_msg"));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_server_stream_preserves_request_data() {
use futures_util::StreamExt;
struct RequestPreservingStreamHandler;
impl GrpcHandler for RequestPreservingStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.RequestPreservingService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
Box::pin(async move {
assert_eq!(request.service_name, "test.RequestPreservingService");
assert_eq!(request.method_name, "PreserveTest");
assert_eq!(request.payload, Bytes::from("test_payload"));
let messages = vec![Bytes::from("response")];
Ok(super::super::streaming::message_stream_from_vec(messages))
})
}
}
let handler = RequestPreservingStreamHandler;
let request = GrpcRequestData {
service_name: "test.RequestPreservingService".to_string(),
method_name: "PreserveTest".to_string(),
payload: Bytes::from("test_payload"),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_ok());
let mut stream = result.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(msg, Bytes::from("response"));
}
#[tokio::test]
async fn test_server_stream_with_various_error_codes() {
struct ErrorCodeStreamHandler {
error_code: tonic::Code,
}
impl GrpcHandler for ErrorCodeStreamHandler {
fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async {
Ok(GrpcResponseData {
payload: Bytes::new(),
metadata: MetadataMap::new(),
})
})
}
fn service_name(&self) -> &str {
"test.ErrorCodeService"
}
fn rpc_mode(&self) -> RpcMode {
RpcMode::ServerStreaming
}
fn call_server_stream(
&self,
_request: GrpcRequestData,
) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
let code = self.error_code;
Box::pin(async move {
match code {
tonic::Code::InvalidArgument => Err(tonic::Status::invalid_argument("Invalid argument")),
tonic::Code::FailedPrecondition => {
Err(tonic::Status::failed_precondition("Failed precondition"))
}
tonic::Code::PermissionDenied => Err(tonic::Status::permission_denied("Permission denied")),
_ => Err(tonic::Status::internal("Internal error")),
}
})
}
}
let handler = ErrorCodeStreamHandler {
error_code: tonic::Code::InvalidArgument,
};
let request = GrpcRequestData {
service_name: "test.ErrorCodeService".to_string(),
method_name: "Error".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_err());
if let Err(error) = result {
assert_eq!(error.code(), tonic::Code::InvalidArgument);
}
let handler = ErrorCodeStreamHandler {
error_code: tonic::Code::FailedPrecondition,
};
let request = GrpcRequestData {
service_name: "test.ErrorCodeService".to_string(),
method_name: "Error".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_err());
if let Err(error) = result {
assert_eq!(error.code(), tonic::Code::FailedPrecondition);
}
let handler = ErrorCodeStreamHandler {
error_code: tonic::Code::PermissionDenied,
};
let request = GrpcRequestData {
service_name: "test.ErrorCodeService".to_string(),
method_name: "Error".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call_server_stream(request).await;
assert!(result.is_err());
if let Err(error) = result {
assert_eq!(error.code(), tonic::Code::PermissionDenied);
}
}
}