#![allow(clippy::all)]
#![allow(warnings)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::assertions_on_constants)]
use rpcnet::{RpcClient, RpcConfig, RpcServer};
use std::time::Duration;
use tokio::time::timeout;
fn test_config() -> RpcConfig {
RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost")
}
async fn start_test_server() -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
let mut server = RpcServer::new(test_config());
server
.register("simple_echo", |params| async move { Ok(params) })
.await;
server
.register("multiply", |params| async move {
if let Ok(number) = bincode::deserialize::<i32>(¶ms) {
let result = number * 2;
bincode::serialize(&result).map_err(rpcnet::RpcError::SerializationError)
} else {
Err(rpcnet::RpcError::SerializationError(bincode::Error::new(
bincode::ErrorKind::Custom("Invalid input".to_string()),
)))
}
})
.await;
server
.register_streaming("simple_stream", |_request_stream| async move {
Box::pin(async_stream::stream! {
for i in 0..3 {
yield Ok(format!("stream_message_{}", i).into_bytes());
tokio::time::sleep(Duration::from_millis(1)).await;
}
})
as std::pin::Pin<
Box<dyn futures::Stream<Item = Result<Vec<u8>, rpcnet::RpcError>> + Send>,
>
})
.await;
let quic_server = server.bind()?;
let addr = quic_server.local_addr()?;
let mut server_clone = server.clone();
tokio::spawn(async move {
server_clone
.start(quic_server)
.await
.expect("Server should start");
});
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(addr)
}
#[tokio::test]
async fn test_regular_rpc_still_works() {
let server_addr = start_test_server().await.expect("Server should start");
let client = RpcClient::connect(server_addr, test_config())
.await
.expect("Client should connect");
let response = client
.call("simple_echo", b"hello".to_vec())
.await
.expect("Regular RPC should work");
assert_eq!(response, b"hello");
}
#[tokio::test]
async fn test_regular_rpc_with_serialization() {
let server_addr = start_test_server().await.expect("Server should start");
let client = RpcClient::connect(server_addr, test_config())
.await
.expect("Client should connect");
let number = 21;
let request = bincode::serialize(&number).expect("Serialization should work");
let response = client
.call("multiply", request)
.await
.expect("Regular RPC should work");
let result: i32 = bincode::deserialize(&response).expect("Deserialization should work");
assert_eq!(result, 42);
}
#[tokio::test]
async fn test_streaming_server_registration() {
let server_addr = start_test_server().await.expect("Server should start");
let client = RpcClient::connect(server_addr, test_config())
.await
.expect("Client should connect even with streaming handlers");
let response = client
.call("simple_echo", b"test".to_vec())
.await
.expect("Regular RPC should work with streaming handlers present");
assert_eq!(response, b"test");
}
#[tokio::test]
async fn test_concurrent_regular_rpc() {
let server_addr = start_test_server().await.expect("Server should start");
let mut tasks = Vec::new();
for i in 0..5 {
let task = tokio::spawn(async move {
let client = RpcClient::connect(server_addr, test_config())
.await
.expect("Client should connect");
let message = format!("message_{}", i);
let response = client
.call("simple_echo", message.as_bytes().to_vec())
.await
.expect("Regular RPC should work");
String::from_utf8(response).expect("Should be valid UTF8")
});
tasks.push(task);
}
let results = timeout(Duration::from_secs(10), futures::future::join_all(tasks))
.await
.expect("All tasks should complete within timeout");
for (i, result) in results.iter().enumerate() {
match result {
Ok(message) => assert_eq!(message, &format!("message_{}", i)),
Err(e) => panic!("Task {} failed: {:?}", i, e),
}
}
}
#[tokio::test]
async fn test_streaming_handler_compilation() {
let server = RpcServer::new(test_config());
let registration_result = timeout(Duration::from_secs(5), async {
server
.register_streaming("test_handler", |_request_stream| async move {
Box::pin(async_stream::stream! {
yield Ok(b"test_response".to_vec());
})
as std::pin::Pin<
Box<dyn futures::Stream<Item = Result<Vec<u8>, rpcnet::RpcError>> + Send>,
>
})
.await;
"completed"
})
.await;
assert!(
registration_result.is_ok(),
"Streaming handler registration should not hang"
);
assert_eq!(registration_result.unwrap(), "completed");
}
#[tokio::test]
async fn test_server_bind_with_streaming() {
let mut server = RpcServer::new(test_config());
server
.register_streaming("bind_test", |_request_stream| async move {
Box::pin(async_stream::stream! {
yield Ok(b"bind_successful".to_vec());
})
as std::pin::Pin<
Box<dyn futures::Stream<Item = Result<Vec<u8>, rpcnet::RpcError>> + Send>,
>
})
.await;
let bind_result = server.bind();
assert!(
bind_result.is_ok(),
"Server should bind successfully with streaming handlers"
);
}
#[tokio::test]
async fn test_regular_rpc_performance() {
let server_addr = start_test_server().await.expect("Server should start");
let client = RpcClient::connect(server_addr, test_config())
.await
.expect("Client should connect");
let start = std::time::Instant::now();
for i in 0..100 {
let message = format!("msg_{}", i);
let _response = client
.call("simple_echo", message.into_bytes())
.await
.expect("RPC should succeed");
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(10),
"100 RPC calls should complete within 10 seconds"
);
}