#![allow(clippy::all)]
#![allow(warnings)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::assertions_on_constants)]
use futures::{SinkExt, StreamExt};
use rpcnet::{RpcClient, RpcConfig, RpcError, RpcServer};
use std::time::Duration;
use tokio::time::timeout;
fn create_test_config(addr: &str) -> RpcConfig {
RpcConfig::new("certs/test_cert.pem", addr)
.with_key_path("certs/test_key.pem")
.with_server_name("localhost")
.with_keep_alive_interval(Duration::from_millis(100))
}
#[tokio::test]
async fn test_connection_establishment_failure_paths() {
let bad_config = RpcConfig::new("/nonexistent/cert.pem", "127.0.0.1:0")
.with_key_path("/nonexistent/key.pem")
.with_server_name("localhost");
let result = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), bad_config),
)
.await;
assert!(result.is_err() || result.unwrap().is_err());
let bad_addr_config = RpcConfig::new("certs/test_cert.pem", "invalid_address_format")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let result2 = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), bad_addr_config),
)
.await;
assert!(result2.is_err() || result2.unwrap().is_err());
let config = create_test_config("127.0.0.1:0");
let result3 = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config),
)
.await;
assert!(result3.is_err() || result3.unwrap().is_err());
}
#[tokio::test]
async fn test_call_method_timeout_and_error_paths() {
let config = create_test_config("127.0.0.1:0");
let client_result = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config),
)
.await;
assert!(client_result.is_err() || client_result.unwrap().is_err());
}
#[tokio::test]
async fn test_streaming_method_error_paths() {
let config = create_test_config("127.0.0.1:0");
let client_result = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config),
)
.await;
assert!(client_result.is_err() || client_result.unwrap().is_err());
}
#[tokio::test]
async fn test_keep_alive_configuration_path() {
let config_with_keepalive = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost")
.with_keep_alive_interval(Duration::from_secs(30));
let result = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config_with_keepalive),
)
.await;
assert!(result.is_err() || result.unwrap().is_err());
let config_no_keepalive = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost")
.with_keep_alive_interval(Duration::ZERO);
let result2 = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config_no_keepalive),
)
.await;
assert!(result2.is_err() || result2.unwrap().is_err());
}
#[tokio::test]
async fn test_server_streaming_with_mock_data() {
let config = create_test_config("127.0.0.1:0");
let server = RpcServer::new(config);
server
.register_streaming("test_stream", |_request_stream| async move {
Box::pin(futures::stream::iter(vec![
Ok(b"response1".to_vec()),
Ok(b"response2".to_vec()),
Err(RpcError::StreamError("test error".to_string())), Ok(b"response3".to_vec()),
]))
})
.await;
let handlers = server.streaming_handlers.read().await;
assert!(handlers.contains_key("test_stream"));
if let Some(handler) = handlers.get("test_stream") {
let mock_request_stream =
futures::stream::iter(vec![b"request1".to_vec(), b"request2".to_vec()]);
let mut response_stream = handler(Box::pin(mock_request_stream)).await;
let mut responses = Vec::new();
while let Some(response) = response_stream.next().await {
responses.push(response);
if responses.len() >= 4 {
break;
}
}
assert!(!responses.is_empty());
assert!(responses.len() >= 3);
assert!(responses[0].is_ok());
assert!(responses[1].is_ok());
assert!(responses[2].is_err()); }
}
#[tokio::test]
async fn test_response_buffer_parsing_edge_cases() {
let config = create_test_config("127.0.0.1:0");
let result = timeout(
Duration::from_millis(500),
RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config),
)
.await;
assert!(result.is_err() || result.unwrap().is_err());
}
#[tokio::test]
async fn test_concurrent_streaming_error_handling() {
let config = create_test_config("127.0.0.1:0");
let mut handles = Vec::new();
for i in 0..5 {
let config_clone = config.clone();
let handle = tokio::spawn(async move {
let result = timeout(
Duration::from_millis(200),
RpcClient::connect(
format!("127.0.0.1:1999{}", i).parse().unwrap(),
config_clone,
),
)
.await;
assert!(result.is_err() || result.unwrap().is_err());
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_streaming_request_stream_errors() {
let config = create_test_config("127.0.0.1:0");
let server = RpcServer::new(config);
server
.register_streaming("error_test", |mut request_stream| async move {
Box::pin(async_stream::stream! {
let mut count = 0;
while let Some(request_data) = request_stream.next().await {
count += 1;
if count == 1 {
yield Ok(b"first response".to_vec());
} else if count == 2 {
yield Err(RpcError::StreamError("Processing failed".to_string()));
} else if count >= 3 {
break;
}
}
yield Ok(b"final response".to_vec());
})
})
.await;
let handlers = server.streaming_handlers.read().await;
assert!(handlers.contains_key("error_test"));
}
#[tokio::test]
async fn test_response_stream_send_failures() {
let config = create_test_config("127.0.0.1:0");
let server = RpcServer::new(config);
server
.register_streaming("send_fail_test", |_request_stream| async move {
Box::pin(futures::stream::iter(vec![
Ok(b"normal response".to_vec()),
Err(RpcError::StreamError("Simulated send failure".to_string())),
Ok(b"recovery response".to_vec()),
]))
})
.await;
let handlers = server.streaming_handlers.read().await;
assert!(handlers.contains_key("send_fail_test"));
if let Some(handler) = handlers.get("send_fail_test") {
let mock_requests = futures::stream::iter(vec![b"test".to_vec()]);
let mut responses = handler(Box::pin(mock_requests)).await;
let mut response_count = 0;
let mut error_count = 0;
while let Some(response) = responses.next().await {
response_count += 1;
if response.is_err() {
error_count += 1;
}
if response_count >= 3 {
break;
}
}
assert!(response_count >= 3);
assert!(error_count >= 1); }
}
#[tokio::test]
async fn test_various_config_error_paths() {
let config1 = create_test_config("127.0.0.1:0");
let result1 = timeout(
Duration::from_millis(300),
RpcClient::connect("127.0.0.1:19991".parse().unwrap(), config1),
)
.await;
assert!(result1.is_err() || result1.unwrap().is_err());
let config2 = create_test_config("127.0.0.1:0");
let result2 = timeout(
Duration::from_millis(300),
RpcClient::connect("127.0.0.1:19992".parse().unwrap(), config2),
)
.await;
assert!(result2.is_err() || result2.unwrap().is_err());
let config3 =
create_test_config("127.0.0.1:0").with_keep_alive_interval(Duration::from_nanos(1)); let result3 = timeout(
Duration::from_millis(300),
RpcClient::connect("127.0.0.1:19993".parse().unwrap(), config3),
)
.await;
assert!(result3.is_err() || result3.unwrap().is_err());
}
#[tokio::test]
async fn test_buffer_management_edge_cases() {
let config = create_test_config("127.0.0.1:0");
let server = RpcServer::new(config);
server
.register_streaming("buffer_test", |_request_stream| async move {
Box::pin(futures::stream::iter(vec![
Ok(vec![0u8; 1]), Ok(vec![0u8; 8192]), Ok(vec![0u8; 16384]), Ok(vec![]), ]))
})
.await;
let handlers = server.streaming_handlers.read().await;
assert!(handlers.contains_key("buffer_test"));
if let Some(handler) = handlers.get("buffer_test") {
let mock_requests = futures::stream::iter(vec![b"test".to_vec()]);
let mut responses = handler(Box::pin(mock_requests)).await;
let mut response_sizes = Vec::new();
while let Some(response) = responses.next().await {
if let Ok(data) = response {
response_sizes.push(data.len());
}
if response_sizes.len() >= 4 {
break;
}
}
assert_eq!(response_sizes.len(), 4);
assert_eq!(response_sizes[0], 1); assert_eq!(response_sizes[1], 8192); assert_eq!(response_sizes[2], 16384); assert_eq!(response_sizes[3], 0); }
}