Expand description
§tonic-mock
A comprehensive testing utility for tonic gRPC services in Rust.
This crate helps you test gRPC services with minimal effort by providing utilities to:
- Create and manipulate streaming requests and responses
- Process streaming responses with timeouts
- Test bidirectional streaming interactions
- Mock gRPC clients for isolated testing
- Add interceptors to modify requests
§Core Functionality
Seven main functions are provided:
streaming_request: Build streaming requests based on a vector of messages.streaming_request_with_interceptor: Build streaming requests with an interceptor function.request_with_interceptor: Create a standard (non-streaming) request with an interceptor.process_streaming_response: Iterate the streaming response and call the closure user provided.process_streaming_response_with_timeout: Iterate the streaming response with a timeout for each message.stream_to_vec: Iterate the streaming response and generate a vector for further processing.stream_to_vec_with_timeout: Iterate the streaming response with a timeout and generate a vector.
Additionally, BidirectionalStreamingTest provides utilities for fine-grained testing of bidirectional streaming services,
and the client_mock module allows mocking gRPC clients.
§Basic Example
use tonic::{Request, Response, Status};
use tonic_mock::{streaming_request, process_streaming_response};
#[tokio::test]
async fn service_push_works() -> Result<(), Box<dyn std::error::Error>> {
// Create test data
let events = vec![
RequestPush::new("1", "data1"),
RequestPush::new("2", "data2"),
RequestPush::new("3", "data3"),
];
// Create a streaming request
let req = streaming_request(events);
// Call your service
let res = your_service.push(req).await?;
// Process the streaming response
process_streaming_response(res, |msg, i| {
assert!(msg.is_ok());
assert_eq!(msg.as_ref().unwrap().code, i as i32);
})
.await;
Ok(())
}§Testing Client Streaming
For client streaming (multiple requests, single response), use streaming_request to create a stream of messages:
use tonic_mock::streaming_request;
// Create a vector of messages to send
let messages = vec![
StreamRequest { id: 1, data: "first".to_string() },
StreamRequest { id: 2, data: "second".to_string() },
];
// Create the streaming request
let request = streaming_request(messages);
// Call your service
let response = your_service.client_streaming_method(request).await?;
// Check the response
let response = response.into_inner();
assert_eq!(response.count, 2);§Testing Server Streaming
For server streaming (single request, multiple responses), use process_streaming_response or stream_to_vec:
use tonic_mock::{process_streaming_response, stream_to_vec};
// Create the request
let request = Request::new(ServerStreamRequest { count: 3 });
// Call your service
let response = your_service.server_streaming_method(request).await?;
// Process responses with a callback
process_streaming_response(response, |msg, idx| {
assert!(msg.is_ok());
let response = msg.unwrap();
assert_eq!(response.index, idx as i32);
}).await;
// Or convert the stream to a vector
let request = Request::new(ServerStreamRequest { count: 3 });
let response = your_service.server_streaming_method(request).await?;
let results = stream_to_vec(response).await;
assert_eq!(results.len(), 3);
for (i, result) in results.iter().enumerate() {
assert!(result.is_ok());
let response = result.as_ref().unwrap();
assert_eq!(response.index, i as i32);
}§Bidirectional Streaming
Use BidirectionalStreamingTest for testing bidirectional streaming services:
use std::time::Duration;
use tonic_mock::BidirectionalStreamingTest;
#[tokio::test]
async fn test_bidirectional_service() {
// Create a test context with your service function
let mut test = BidirectionalStreamingTest::new(my_bidirectional_service);
// Send messages one by one
test.send_client_message(TestRequest::new("msg1", "First message")).await;
// Get response
if let Some(response) = test.get_server_response().await {
// Verify response
assert_eq!(response.code, 200);
assert!(response.message.contains("First message"));
}
// You can also use timeouts
match test.get_server_response_with_timeout(Duration::from_millis(100)).await {
Ok(Some(resp)) => {
// Handle response
},
Ok(None) => {
// No more responses
},
Err(status) => {
// Timeout or other error
},
}
// Complete the test when done
test.complete().await;
}§Mocking gRPC Clients
The client_mock module provides the MockableGrpcClient for mocking gRPC clients:
use tonic_mock::client_mock::{MockableGrpcClient, MockResponseDefinition, GrpcClientExt};
use tonic::{Request, Status, Code};
use prost::Message;
// Define a client type that will use the mock
#[derive(Clone)]
struct UserServiceClient<T> {
inner: T,
}
// Implement the GrpcClientExt trait for your client
impl GrpcClientExt<UserServiceClient<MockableGrpcClient>> for UserServiceClient<MockableGrpcClient> {
fn with_mock(mock: MockableGrpcClient) -> Self {
Self { inner: mock }
}
}
#[tokio::test]
async fn test_user_service_client() {
// Create a mock client
let mock = MockableGrpcClient::new();
// Configure mock responses
mock.mock::<UserRequest, UserResponse>("user.UserService", "GetUser")
.respond_when(
|req| req.user_id == "existing",
MockResponseDefinition::ok(UserResponse {
name: "Existing User".to_string(),
})
)
.await
.respond_with(
MockResponseDefinition::err(Status::new(Code::NotFound, "User not found"))
)
.await;
// Create a client with the mock
let mut client = UserServiceClient::with_mock(mock.clone());
// Make a request
let request = Request::new(UserRequest { user_id: "existing".to_string() });
let response = client.get_user(request).await.unwrap();
// Verify the response
assert_eq!(response.get_ref().name, "Existing User");
// Reset mock when done
mock.reset().await;
}§Request Interceptors
Use interceptors to modify requests before they are sent:
use tonic::metadata::MetadataValue;
use tonic_mock::streaming_request_with_interceptor;
// Create a streaming request with an interceptor
let request = streaming_request_with_interceptor(messages, |req| {
// Add authentication header
req.metadata_mut().insert(
"authorization",
MetadataValue::from_static("Bearer token123")
);
// Add tracing header
req.metadata_mut().insert(
"x-request-id",
MetadataValue::from_static("trace-456")
);
});§Timeout Support
Handle timeouts in streaming responses:
use std::time::Duration;
use tonic_mock::process_streaming_response_with_timeout;
// Process response with a 1-second timeout for each message
process_streaming_response_with_timeout(
response,
Duration::from_secs(1),
|msg, idx| {
if msg.is_ok() {
// Handle successful message
let response = msg.as_ref().unwrap();
} else {
// Handle error (could be timeout or other error)
let error = msg.as_ref().err().unwrap();
if error.code() == tonic::Code::DeadlineExceeded {
println!("Timeout occurred: {}", error.message());
}
}
}
).await;§Test Utilities
The crate provides optional test utilities with the test-utils feature (enabled by default):
TestRequestandTestResponse: Simple message types for testingcreate_test_messages: Generate test messages with sequential IDscreate_stream_response: Create a streaming response from a vector of messagescreate_stream_response_with_errors: Create a streaming response with errors at specified indicesassert_message_eq: Assert that a message matches expected valuesassert_response_eq: Assert that a response matches expected values
§gRPC Mock Utilities
The grpc_mock module provides low-level utilities for mocking gRPC messages:
encode_grpc_request: Encode a request messageencode_grpc_response: Encode a response messagedecode_grpc_message: Decode a gRPC messagemock_grpc_call: Mock a gRPC call with a handler functioncreate_grpc_uri: Create a gRPC URI for a service method
Re-exports§
pub use client_mock::GrpcClientExt;pub use client_mock::MockResponseDefinition;pub use client_mock::MockableGrpcClient;pub use test_utils::*;
Modules§
- client_
mock - Client Mocking
- grpc_
mock - gRPC Mock Utilities
- test_
utils - Test Utilities
Structs§
- Bidirectional
Streaming Test - A bidirectional streaming test context that allows controlled message exchange
- Mock
Body - Prost
Decoder - A
Decoderthat knows how to decodeU.
Functions§
- process_
streaming_ response - a simple wrapper to process and validate streaming response
- process_
streaming_ response_ with_ timeout - Process a streaming response with a configurable timeout
- request_
with_ interceptor - Create a regular (non-streaming) request with an interceptor
- stream_
to_ vec - convert a streaming response to a Vec for simplified testing
- stream_
to_ vec_ with_ timeout - Convert a streaming response to a Vec with timeout support
- streaming_
request - Generate streaming request for GRPC
- streaming_
request_ with_ interceptor - Generate streaming request for GRPC with an interceptor
Type Aliases§
- Request
Interceptor - Type alias for a request interceptor function
- Stream
Response - Type alias for convenience
- Stream
Response Inner - Type alias for convenience