Crate tonic_mock

Crate tonic_mock 

Source
Expand description

§tonic-mock

A comprehensive testing utility for tonic gRPC services in Rust.

This crate helps you test gRPC services with minimal effort by providing utilities to:

  • Create and manipulate streaming requests and responses
  • Process streaming responses with timeouts
  • Test bidirectional streaming interactions
  • Mock gRPC clients for isolated testing
  • Add interceptors to modify requests

§Core Functionality

Seven main functions are provided:

Additionally, BidirectionalStreamingTest provides utilities for fine-grained testing of bidirectional streaming services, and the client_mock module allows mocking gRPC clients.

§Basic Example

use tonic::{Request, Response, Status};
use tonic_mock::{streaming_request, process_streaming_response};

#[tokio::test]
async fn service_push_works() -> Result<(), Box<dyn std::error::Error>> {
    // Create test data
    let events = vec![
        RequestPush::new("1", "data1"),
        RequestPush::new("2", "data2"),
        RequestPush::new("3", "data3"),
    ];

    // Create a streaming request
    let req = streaming_request(events);

    // Call your service
    let res = your_service.push(req).await?;

    // Process the streaming response
    process_streaming_response(res, |msg, i| {
        assert!(msg.is_ok());
        assert_eq!(msg.as_ref().unwrap().code, i as i32);
    })
    .await;

    Ok(())
}

§Testing Client Streaming

For client streaming (multiple requests, single response), use streaming_request to create a stream of messages:

use tonic_mock::streaming_request;

// Create a vector of messages to send
let messages = vec![
    StreamRequest { id: 1, data: "first".to_string() },
    StreamRequest { id: 2, data: "second".to_string() },
];

// Create the streaming request
let request = streaming_request(messages);

// Call your service
let response = your_service.client_streaming_method(request).await?;

// Check the response
let response = response.into_inner();
assert_eq!(response.count, 2);

§Testing Server Streaming

For server streaming (single request, multiple responses), use process_streaming_response or stream_to_vec:

use tonic_mock::{process_streaming_response, stream_to_vec};

// Create the request
let request = Request::new(ServerStreamRequest { count: 3 });

// Call your service
let response = your_service.server_streaming_method(request).await?;

// Process responses with a callback
process_streaming_response(response, |msg, idx| {
    assert!(msg.is_ok());
    let response = msg.unwrap();
    assert_eq!(response.index, idx as i32);
}).await;

// Or convert the stream to a vector
let request = Request::new(ServerStreamRequest { count: 3 });
let response = your_service.server_streaming_method(request).await?;
let results = stream_to_vec(response).await;

assert_eq!(results.len(), 3);
for (i, result) in results.iter().enumerate() {
    assert!(result.is_ok());
    let response = result.as_ref().unwrap();
    assert_eq!(response.index, i as i32);
}

§Bidirectional Streaming

Use BidirectionalStreamingTest for testing bidirectional streaming services:

use std::time::Duration;
use tonic_mock::BidirectionalStreamingTest;

#[tokio::test]
async fn test_bidirectional_service() {
    // Create a test context with your service function
    let mut test = BidirectionalStreamingTest::new(my_bidirectional_service);

    // Send messages one by one
    test.send_client_message(TestRequest::new("msg1", "First message")).await;

    // Get response
    if let Some(response) = test.get_server_response().await {
        // Verify response
        assert_eq!(response.code, 200);
        assert!(response.message.contains("First message"));
    }

    // You can also use timeouts
    match test.get_server_response_with_timeout(Duration::from_millis(100)).await {
        Ok(Some(resp)) => {
            // Handle response
        },
        Ok(None) => {
            // No more responses
        },
        Err(status) => {
            // Timeout or other error
        },
    }

    // Complete the test when done
    test.complete().await;
}

§Mocking gRPC Clients

The client_mock module provides the MockableGrpcClient for mocking gRPC clients:

use tonic_mock::client_mock::{MockableGrpcClient, MockResponseDefinition, GrpcClientExt};
use tonic::{Request, Status, Code};
use prost::Message;

// Define a client type that will use the mock
#[derive(Clone)]
struct UserServiceClient<T> {
    inner: T,
}

// Implement the GrpcClientExt trait for your client
impl GrpcClientExt<UserServiceClient<MockableGrpcClient>> for UserServiceClient<MockableGrpcClient> {
    fn with_mock(mock: MockableGrpcClient) -> Self {
        Self { inner: mock }
    }
}

#[tokio::test]
async fn test_user_service_client() {
    // Create a mock client
    let mock = MockableGrpcClient::new();

    // Configure mock responses
    mock.mock::<UserRequest, UserResponse>("user.UserService", "GetUser")
        .respond_when(
            |req| req.user_id == "existing",
            MockResponseDefinition::ok(UserResponse {
                name: "Existing User".to_string(),
            })
        )
        .await
        .respond_with(
            MockResponseDefinition::err(Status::new(Code::NotFound, "User not found"))
        )
        .await;

    // Create a client with the mock
    let mut client = UserServiceClient::with_mock(mock.clone());

    // Make a request
    let request = Request::new(UserRequest { user_id: "existing".to_string() });
    let response = client.get_user(request).await.unwrap();

    // Verify the response
    assert_eq!(response.get_ref().name, "Existing User");

    // Reset mock when done
    mock.reset().await;
}

§Request Interceptors

Use interceptors to modify requests before they are sent:

use tonic::metadata::MetadataValue;
use tonic_mock::streaming_request_with_interceptor;

// Create a streaming request with an interceptor
let request = streaming_request_with_interceptor(messages, |req| {
    // Add authentication header
    req.metadata_mut().insert(
        "authorization",
        MetadataValue::from_static("Bearer token123")
    );

    // Add tracing header
    req.metadata_mut().insert(
        "x-request-id",
        MetadataValue::from_static("trace-456")
    );
});

§Timeout Support

Handle timeouts in streaming responses:

use std::time::Duration;
use tonic_mock::process_streaming_response_with_timeout;

// Process response with a 1-second timeout for each message
process_streaming_response_with_timeout(
    response,
    Duration::from_secs(1),
    |msg, idx| {
        if msg.is_ok() {
            // Handle successful message
            let response = msg.as_ref().unwrap();
        } else {
            // Handle error (could be timeout or other error)
            let error = msg.as_ref().err().unwrap();
            if error.code() == tonic::Code::DeadlineExceeded {
                println!("Timeout occurred: {}", error.message());
            }
        }
    }
).await;

§Test Utilities

The crate provides optional test utilities with the test-utils feature (enabled by default):

§gRPC Mock Utilities

The grpc_mock module provides low-level utilities for mocking gRPC messages:

Re-exports§

pub use client_mock::GrpcClientExt;
pub use client_mock::MockResponseDefinition;
pub use client_mock::MockableGrpcClient;
pub use test_utils::*;

Modules§

client_mock
Client Mocking
grpc_mock
gRPC Mock Utilities
test_utils
Test Utilities

Structs§

BidirectionalStreamingTest
A bidirectional streaming test context that allows controlled message exchange
MockBody
ProstDecoder
A Decoder that knows how to decode U.

Functions§

process_streaming_response
a simple wrapper to process and validate streaming response
process_streaming_response_with_timeout
Process a streaming response with a configurable timeout
request_with_interceptor
Create a regular (non-streaming) request with an interceptor
stream_to_vec
convert a streaming response to a Vec for simplified testing
stream_to_vec_with_timeout
Convert a streaming response to a Vec with timeout support
streaming_request
Generate streaming request for GRPC
streaming_request_with_interceptor
Generate streaming request for GRPC with an interceptor

Type Aliases§

RequestInterceptor
Type alias for a request interceptor function
StreamResponse
Type alias for convenience
StreamResponseInner
Type alias for convenience