use erpc_rust::{
auxiliary::{MessageInfo, MessageType},
client::ClientManager,
codec::{BasicCodec, BasicCodecFactory, Codec},
server::{BaseService, FunctionHandler, Server, ServerBuilder},
transport::{memory::MemoryTransport, TcpTransport},
};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::time::sleep;
async fn find_available_port() -> u16 {
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
port
}
fn create_echo_service() -> BaseService {
let mut service = BaseService::new(1);
service.add_method(
1,
FunctionHandler::new(|sequence, codec| {
let input = codec.read_string()?;
let reply_info = MessageInfo::new(MessageType::Reply, 1, 1, sequence);
codec.start_write_message(&reply_info)?;
let response = format!("Echo: {}", input);
codec.write_string(&response)?;
Ok(())
}),
);
service.add_method(
2,
FunctionHandler::new(|sequence, codec| {
let input = codec.read_string()?;
let reply_info = MessageInfo::new(MessageType::Reply, 1, 2, sequence);
codec.start_write_message(&reply_info)?;
let response = input.to_uppercase();
codec.write_string(&response)?;
Ok(())
}),
);
service.add_method(
3,
FunctionHandler::new(|_sequence, codec| {
let _message = codec.read_string()?;
Ok(())
}),
);
service
}
fn create_calculator_service() -> BaseService {
let mut service = BaseService::new(42);
service.add_method(
1,
FunctionHandler::new(|sequence, codec| {
let a = codec.read_float()?;
let b = codec.read_float()?;
let result = a + b;
let reply_info = MessageInfo::new(MessageType::Reply, 42, 1, sequence);
codec.start_write_message(&reply_info)?;
codec.write_float(result)?;
Ok(())
}),
);
service.add_method(
2,
FunctionHandler::new(|sequence, codec| {
let a = codec.read_int32()?;
let b = codec.read_int32()?;
let result = a * b;
let reply_info = MessageInfo::new(MessageType::Reply, 42, 2, sequence);
codec.start_write_message(&reply_info)?;
codec.write_int32(result)?;
Ok(())
}),
);
service
}
#[tokio::test]
async fn test_tcp_client_server_echo() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{}", port);
let server_addr = addr.clone();
let server_task = tokio::spawn(async move {
let listener = TcpListener::bind(&server_addr).await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let tcp_transport = TcpTransport::from_stream(stream);
let codec_factory = BasicCodecFactory::new();
let echo_service = create_echo_service();
let mut server = ServerBuilder::new()
.transport(tcp_transport)
.codec_factory(codec_factory)
.service(Arc::new(echo_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(10)) => {
}
}
});
sleep(Duration::from_millis(100)).await;
let tcp_transport = TcpTransport::connect(&addr).await.unwrap();
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(tcp_transport, codec_factory);
let mut request_codec = BasicCodec::new();
request_codec.write_string("Hello, TCP server!").unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(1, 1, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let response = response_codec.read_string().unwrap();
assert_eq!(response, "Echo: Hello, TCP server!");
let mut request_codec = BasicCodec::new();
request_codec.write_string("make me uppercase").unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(1, 2, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let response = response_codec.read_string().unwrap();
assert_eq!(response, "MAKE ME UPPERCASE");
let mut oneway_codec = BasicCodec::new();
oneway_codec.write_string("Oneway notification").unwrap();
let oneway_data = oneway_codec.as_bytes().to_vec();
client
.perform_request(1, 3, true, oneway_data)
.await
.unwrap();
client.close().await.unwrap();
server_task.abort();
}
#[tokio::test]
async fn test_tcp_calculator_service() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{}", port);
let server_addr = addr.clone();
let server_task = tokio::spawn(async move {
let listener = TcpListener::bind(&server_addr).await.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let tcp_transport = TcpTransport::from_stream(stream);
let codec_factory = BasicCodecFactory::new();
let calc_service = create_calculator_service();
let mut server = ServerBuilder::new()
.transport(tcp_transport)
.codec_factory(codec_factory)
.service(Arc::new(calc_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(10)) => {}
}
});
sleep(Duration::from_millis(100)).await;
let tcp_transport = TcpTransport::connect(&addr).await.unwrap();
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(tcp_transport, codec_factory);
let mut request_codec = BasicCodec::new();
request_codec.write_float(3.14).unwrap();
request_codec.write_float(2.86).unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(42, 1, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let result = response_codec.read_float().unwrap();
assert!((result - 6.0).abs() < 0.001);
let mut request_codec = BasicCodec::new();
request_codec.write_int32(7).unwrap();
request_codec.write_int32(6).unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(42, 2, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let result = response_codec.read_int32().unwrap();
assert_eq!(result, 42);
client.close().await.unwrap();
server_task.abort();
}
#[tokio::test]
async fn test_multiple_clients_tcp() {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{}", port);
let server_addr = addr.clone();
let server_task = tokio::spawn(async move {
let listener = TcpListener::bind(&server_addr).await.unwrap();
for _ in 0..3 {
if let Ok((stream, _)) = listener.accept().await {
let tcp_transport = TcpTransport::from_stream(stream);
let codec_factory = BasicCodecFactory::new();
let echo_service = create_echo_service();
tokio::spawn(async move {
let mut server = ServerBuilder::new()
.transport(tcp_transport)
.codec_factory(codec_factory)
.service(Arc::new(echo_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(5)) => {}
}
});
}
}
sleep(Duration::from_secs(10)).await;
});
sleep(Duration::from_millis(100)).await;
let mut client_tasks = Vec::new();
for i in 0..3 {
let client_addr = addr.clone();
let client_task = tokio::spawn(async move {
let tcp_transport = TcpTransport::connect(&client_addr).await.unwrap();
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(tcp_transport, codec_factory);
let message = format!("Hello from client {}", i);
let mut request_codec = BasicCodec::new();
request_codec.write_string(&message).unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(1, 1, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let response = response_codec.read_string().unwrap();
client.close().await.unwrap();
response
});
client_tasks.push(client_task);
}
let results = futures::future::join_all(client_tasks).await;
for (i, result) in results.into_iter().enumerate() {
let response = result.unwrap();
let expected = format!("Echo: Hello from client {}", i);
assert_eq!(response, expected);
}
server_task.abort();
}
#[tokio::test]
async fn test_simple_memory_debug() {
let (client_transport, server_transport) = MemoryTransport::pair();
let server_task = tokio::spawn(async move {
let codec_factory = BasicCodecFactory::new();
let echo_service = create_echo_service();
let mut server = ServerBuilder::new()
.transport(server_transport)
.codec_factory(codec_factory)
.service(Arc::new(echo_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(5)) => {}
}
});
sleep(Duration::from_millis(50)).await;
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(client_transport, codec_factory);
let test_string = "Memory transport test";
let mut request_codec = BasicCodec::new();
request_codec.write_string(test_string).unwrap();
let request_data = request_codec.as_bytes().to_vec();
println!("Request data: {:?}", request_data);
let response_data = client
.perform_request(1, 1, false, request_data)
.await
.unwrap();
println!("Response data: {:?}", response_data);
let mut response_codec = BasicCodec::from_data(response_data);
let response = response_codec.read_string().unwrap();
assert_eq!(response, "Echo: Memory transport test");
client.close().await.unwrap();
server_task.abort();
}
#[tokio::test]
async fn test_error_handling_invalid_service() {
let (client_transport, server_transport) = MemoryTransport::pair();
let server_task = tokio::spawn(async move {
let codec_factory = BasicCodecFactory::new();
let echo_service = create_echo_service();
let mut server = ServerBuilder::new()
.transport(server_transport)
.codec_factory(codec_factory)
.service(Arc::new(echo_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(3)) => {}
}
});
sleep(Duration::from_millis(50)).await;
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(client_transport, codec_factory);
let mut request_codec = BasicCodec::new();
request_codec.write_string("test").unwrap();
let request_data = request_codec.as_bytes().to_vec();
let result = client.perform_request(99, 1, false, request_data).await;
assert!(result.is_err());
client.close().await.unwrap();
server_task.abort();
}
#[tokio::test]
async fn test_stress_multiple_requests() {
let (client_transport, server_transport) = MemoryTransport::pair();
let server_task = tokio::spawn(async move {
let codec_factory = BasicCodecFactory::new();
let calc_service = create_calculator_service();
let mut server = ServerBuilder::new()
.transport(server_transport)
.codec_factory(codec_factory)
.service(Arc::new(calc_service))
.build()
.await
.unwrap();
tokio::select! {
result = server.run() => {
if let Err(e) = result {
eprintln!("Server error: {}", e);
}
}
_ = sleep(Duration::from_secs(10)) => {}
}
});
sleep(Duration::from_millis(50)).await;
let codec_factory = BasicCodecFactory::new();
let mut client = ClientManager::new(client_transport, codec_factory);
for i in 1..=50 {
let mut request_codec = BasicCodec::new();
request_codec.write_float(i as f32).unwrap();
request_codec.write_float(2.0).unwrap();
let request_data = request_codec.as_bytes().to_vec();
let response_data = client
.perform_request(42, 1, false, request_data)
.await
.unwrap();
let mut response_codec = BasicCodec::from_data(response_data);
let result = response_codec.read_float().unwrap();
let expected = i as f32 + 2.0;
assert!(
(result - expected).abs() < 0.001,
"Request {}: expected {}, got {}",
i,
expected,
result
);
}
client.close().await.unwrap();
server_task.abort();
}