use prost::Message;
use tonic::{Request, Response, Status, Streaming};
use tonic_mock::{
StreamResponseInner, stream_to_vec, streaming_request,
test_utils::{TestRequest, TestResponse},
};
#[derive(Clone, PartialEq, Message)]
pub struct ExampleRequest {
#[prost(string, tag = "1")]
pub user_id: String,
#[prost(string, tag = "2")]
pub message: String,
}
#[derive(Clone, PartialEq, Message)]
pub struct ExampleResponse {
#[prost(int32, tag = "1")]
pub status: i32,
#[prost(string, tag = "2")]
pub response: String,
}
async fn echo_service(
request: Request<Streaming<ExampleRequest>>,
) -> Result<Response<StreamResponseInner<ExampleResponse>>, Status> {
let mut request_stream = request.into_inner();
let response_stream = async_stream::try_stream! {
while let Some(req) = request_stream.message().await? {
let response = ExampleResponse {
status: 200,
response: format!("Received: {} from user {}", req.message, req.user_id),
};
yield response;
}
};
Ok(Response::new(Box::pin(response_stream)))
}
async fn run_direct_api_example() {
println!("\nExample 1: Testing bidirectional streaming with pre-collected messages");
let messages = vec![
ExampleRequest {
user_id: "user123".to_string(),
message: "Hello chat!".to_string(),
},
ExampleRequest {
user_id: "user123".to_string(),
message: "How's the weather?".to_string(),
},
];
println!(
"- Created {} messages to send to the service",
messages.len()
);
let request = streaming_request(messages);
println!("- Calling the echo service...");
match echo_service(request).await {
Ok(response) => {
println!("- Service call successful, processing responses");
let results = stream_to_vec(response).await;
for (i, result) in results.iter().enumerate() {
match result {
Ok(resp) => {
println!(
" Response {}: status={}, message={}",
i + 1,
resp.status,
resp.response
);
}
Err(e) => {
println!(" Error {}: {}", i + 1, e);
}
}
}
println!("- Received {} total responses", results.len());
}
Err(status) => {
println!("- Service call failed: {}", status);
}
}
println!("Example 1 completed!");
}
async fn run_test_utils_example() {
println!("\nExample 2: Testing with TestRequest/TestResponse");
async fn test_echo_service(
request: Request<Streaming<TestRequest>>,
) -> Result<Response<StreamResponseInner<TestResponse>>, Status> {
let mut request_stream = request.into_inner();
let mut counter = 0;
let response_stream = async_stream::try_stream! {
while let Some(msg) = request_stream.message().await? {
counter += 1;
let id_str = String::from_utf8_lossy(&msg.id).to_string();
let data_str = String::from_utf8_lossy(&msg.data).to_string();
println!(" Service received message {}: id={}, data={}",
counter, id_str, data_str);
yield TestResponse::new(
counter, format!("Echo #{}: id={}, data={}", counter, id_str, data_str)
);
}
println!(" Service completed after processing {} messages", counter);
};
Ok(Response::new(Box::pin(response_stream)))
}
println!("- Creating test messages");
let test_messages = vec![
TestRequest::new("msg1", "First message"),
TestRequest::new("msg2", "Second message"),
TestRequest::new("msg3", "Final message"),
];
println!(
"- Sending {} messages to the test service",
test_messages.len()
);
let request = streaming_request(test_messages);
match test_echo_service(request).await {
Ok(response) => {
println!("- Service call successful, processing responses");
let results = stream_to_vec(response).await;
println!("- Received {} responses:", results.len());
for (i, result) in results.iter().enumerate() {
match result {
Ok(resp) => {
println!(
" Response {}: code={}, message={}",
i + 1,
resp.code,
resp.message
);
}
Err(e) => {
println!(" Error {}: {}", i + 1, e);
}
}
}
}
Err(status) => {
println!("- Service call failed: {}", status);
}
}
println!("Example 2 completed!");
}
#[tokio::main]
async fn main() {
println!("=== Bidirectional Streaming Test Examples ===");
run_direct_api_example().await;
run_test_utils_example().await;
println!("\nAll examples completed successfully!");
}