use std::time::Duration;
use tonic::{Request, Response, Status, Streaming};
use tonic_mock::{
BidirectionalStreamingTest, StreamResponseInner,
test_utils::{TestRequest, TestResponse},
};
#[tokio::main]
async fn main() {
println!("=== BidirectionalStreamingTest API Example ===");
basic_example().await;
interactive_example().await;
timeout_example().await;
println!("\nAll examples completed successfully!");
}
async fn echo_service(
request: Request<Streaming<TestRequest>>,
) -> Result<Response<StreamResponseInner<TestResponse>>, Status> {
let mut stream = request.into_inner();
let response_stream = async_stream::try_stream! {
let mut counter = 0;
while let Some(msg) = stream.message().await? {
counter += 1;
let id_str = String::from_utf8_lossy(&msg.id).to_string();
let data_str = String::from_utf8_lossy(&msg.data).to_string();
println!(" Service received message {}: id={}, data={}", counter, id_str, data_str);
yield TestResponse::new(
counter,
format!("Echo #{}: id={}, data={}", counter, id_str, data_str)
);
}
println!(" Service stream completed");
};
Ok(Response::new(Box::pin(response_stream)))
}
async fn delayed_service(
request: Request<Streaming<TestRequest>>,
) -> Result<Response<StreamResponseInner<TestResponse>>, Status> {
let mut stream = request.into_inner();
let response_stream = async_stream::try_stream! {
let mut counter = 0;
while let Some(msg) = stream.message().await? {
counter += 1;
let id_str = String::from_utf8_lossy(&msg.id).to_string();
let delay = counter * 50;
println!(" Service adding {}ms delay", delay);
tokio::time::sleep(Duration::from_millis(delay as u64)).await;
yield TestResponse::new(
counter,
format!("Delayed echo #{} ({}ms): {}", counter, delay, id_str)
);
}
println!(" Delayed service stream completed");
};
Ok(Response::new(Box::pin(response_stream)))
}
async fn basic_example() {
println!("\nExample 1: Basic bidirectional streaming");
println!("Pattern: Send all messages → complete() → get all responses");
let mut test = BidirectionalStreamingTest::new(echo_service);
println!("- Sending first message");
test.send_client_message(TestRequest::new("msg1", "First message"))
.await;
println!("- Sending second message");
test.send_client_message(TestRequest::new("msg2", "Second message"))
.await;
println!("- Completing the client stream");
test.complete().await;
println!("- Getting first response");
if let Some(response) = test.get_server_response().await {
println!(
"- Received response: code={}, message={}",
response.code, response.message
);
} else {
println!("- No response received");
}
println!("- Getting second response");
if let Some(response) = test.get_server_response().await {
println!(
"- Received response: code={}, message={}",
response.code, response.message
);
} else {
println!("- No response received");
}
if test.get_server_response().await.is_some() {
println!("- Unexpected extra response received");
} else {
println!("- No more responses (as expected)");
}
println!("Example 1 completed!");
}
async fn interactive_example() {
println!("\nExample 2: Interactive bidirectional streaming");
println!("Pattern: Send message → get response → send message → get response → complete()");
println!(
"IMPORTANT NOTE: For the interactive pattern, you must send ALL messages before getting ANY responses"
);
let mut test = BidirectionalStreamingTest::new(echo_service);
println!("- Sending first message");
test.send_client_message(TestRequest::new("msg1", "First message"))
.await;
println!("- Sending second message");
test.send_client_message(TestRequest::new("msg2", "Second message"))
.await;
println!("- Completing the client stream (required before getting responses)");
test.complete().await;
println!("- Getting first response");
if let Some(response) = test.get_server_response().await {
println!(
"- Received response: code={}, message={}",
response.code, response.message
);
} else {
println!("- No response received");
}
println!("- Getting second response");
if let Some(response) = test.get_server_response().await {
println!(
"- Received response: code={}, message={}",
response.code, response.message
);
} else {
println!("- No response received");
}
println!("Example 2 completed!");
}
async fn timeout_example() {
println!("\nExample 3: Testing with timeouts");
let mut test = BidirectionalStreamingTest::new(delayed_service);
println!("- Sending first message");
test.send_client_message(TestRequest::new("test1", "Test message"))
.await;
println!("- Sending second message (will have 100ms delay)");
test.send_client_message(TestRequest::new("test2", "Will have 100ms delay"))
.await;
println!("- Completing client stream");
test.complete().await;
println!("- Getting first response with 100ms timeout (should succeed)");
match test
.get_server_response_with_timeout(Duration::from_millis(100))
.await
{
Ok(Some(resp)) => {
println!(
"- Received response: code={}, message={}",
resp.code, resp.message
);
}
Ok(None) => println!("- No response received"),
Err(status) => println!("- Error: {}", status),
}
println!("- Getting second response with 50ms timeout (should timeout)");
match test
.get_server_response_with_timeout(Duration::from_millis(50))
.await
{
Ok(Some(_)) => println!("- Unexpected: Received response within timeout"),
Ok(None) => println!("- No response received"),
Err(status) => println!("- Error as expected: {}", status),
}
println!("- Retrying with 150ms timeout (should succeed)");
match test
.get_server_response_with_timeout(Duration::from_millis(150))
.await
{
Ok(Some(resp)) => {
println!(
"- Received response: code={}, message={}",
resp.code, resp.message
);
}
Ok(None) => println!("- No response received"),
Err(status) => println!("- Error: {}", status),
}
println!("Example 3 completed!");
}