#![allow(clippy::all)]
#![allow(warnings)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(dead_code)]
#![allow(clippy::needless_borrows_for_generic_args)]
#![allow(clippy::assertions_on_constants)]
use bytes::Bytes;
use rpcnet::{RpcClient, RpcConfig, RpcError};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
struct MockQuicStream {
data_to_send: Vec<Bytes>,
data_received: Vec<Bytes>,
send_error: Option<String>,
receive_error: Option<String>,
closed: bool,
}
impl MockQuicStream {
fn new() -> Self {
Self {
data_to_send: Vec::new(),
data_received: Vec::new(),
send_error: None,
receive_error: None,
closed: false,
}
}
fn with_data(mut self, data: Vec<u8>) -> Self {
self.data_received.push(Bytes::from(data));
self
}
fn with_send_error(mut self, error: String) -> Self {
self.send_error = Some(error);
self
}
fn with_receive_error(mut self, error: String) -> Self {
self.receive_error = Some(error);
self
}
fn close(mut self) -> Self {
self.closed = true;
self
}
async fn send(&mut self, data: Bytes) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if let Some(ref error) = self.send_error {
return Err(error.clone().into());
}
self.data_to_send.push(data);
Ok(())
}
async fn receive(&mut self) -> Result<Option<Bytes>, Box<dyn std::error::Error + Send + Sync>> {
if let Some(ref error) = self.receive_error {
return Err(error.clone().into());
}
if self.closed && self.data_received.is_empty() {
return Ok(None);
}
if !self.data_received.is_empty() {
Ok(Some(self.data_received.remove(0)))
} else {
Ok(None)
}
}
}
#[tokio::test]
async fn test_create_request_stream_normal_message() {
let message = b"test message";
let len_bytes = (message.len() as u32).to_le_bytes();
let mut full_data = Vec::new();
full_data.extend_from_slice(&len_bytes);
full_data.extend_from_slice(message);
let mock_stream = MockQuicStream::new().with_data(full_data);
}
#[tokio::test]
async fn test_create_request_stream_zero_length_end_marker() {
let end_marker = vec![0, 0, 0, 0]; let mock_stream = MockQuicStream::new().with_data(end_marker);
}
#[tokio::test]
async fn test_create_request_stream_incomplete_message() {
let message = b"test message";
let len_bytes = (message.len() as u32).to_le_bytes();
let mut partial_data = Vec::new();
partial_data.extend_from_slice(&len_bytes);
partial_data.extend_from_slice(&message[..5]);
let mock_stream = MockQuicStream::new().with_data(partial_data);
}
#[tokio::test]
async fn test_create_request_stream_connection_error() {
let mock_stream = MockQuicStream::new().with_receive_error("Connection lost".to_string());
}
#[tokio::test]
async fn test_create_request_stream_connection_closed() {
let mock_stream = MockQuicStream::new().close();
}
#[tokio::test]
async fn test_send_response_stream_success_responses() {
let responses: Vec<Result<Vec<u8>, RpcError>> = vec![
Ok(b"response1".to_vec()),
Ok(b"response2".to_vec()),
Ok(b"response3".to_vec()),
];
let mock_stream = Arc::new(Mutex::new(MockQuicStream::new()));
let response_stream = Box::pin(futures::stream::iter(responses));
}
#[tokio::test]
async fn test_send_response_stream_error_responses() {
let responses: Vec<Result<Vec<u8>, RpcError>> = vec![
Ok(b"response1".to_vec()),
Err(RpcError::StreamError("Test error".to_string())),
Ok(b"response2".to_vec()),
];
let mock_stream = Arc::new(Mutex::new(MockQuicStream::new()));
let response_stream = Box::pin(futures::stream::iter(responses));
}
#[tokio::test]
async fn test_send_response_stream_send_failure() {
let responses: Vec<Result<Vec<u8>, RpcError>> =
vec![Ok(b"response1".to_vec()), Ok(b"response2".to_vec())];
let mock_stream = Arc::new(Mutex::new(
MockQuicStream::new().with_send_error("Send failed".to_string()),
));
let response_stream = Box::pin(futures::stream::iter(responses));
}
#[tokio::test]
async fn test_send_response_stream_end_marker() {
let responses: Vec<Result<Vec<u8>, RpcError>> = vec![];
let mock_stream = Arc::new(Mutex::new(MockQuicStream::new()));
let response_stream = Box::pin(futures::stream::iter(responses));
}
#[tokio::test]
async fn test_client_connection_tls_error() {
let config = RpcConfig::new("/nonexistent/cert.pem", "127.0.0.1:0")
.with_key_path("/nonexistent/key.pem")
.with_server_name("localhost");
let result = RpcClient::connect("127.0.0.1:9999".parse().unwrap(), config).await;
assert!(result.is_err());
if let Err(e) = result {
match e {
RpcError::TlsError(_) | RpcError::ConfigError(_) | RpcError::ConnectionError(_) => {
}
_ => panic!("Unexpected error type: {:?}", e),
}
}
}
#[tokio::test]
async fn test_client_connection_limits_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let result = RpcClient::connect("127.0.0.1:9999".parse().unwrap(), config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_client_connection_io_error() {
let config = RpcConfig::new("certs/test_cert.pem", "invalid_address_format")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let result = RpcClient::connect("127.0.0.1:9999".parse().unwrap(), config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_client_connection_start_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let result = RpcClient::connect("127.0.0.1:9999".parse().unwrap(), config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_client_connection_connect_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(result.is_err());
if let Err(e) = result {
match e {
RpcError::ConnectionError(_) | RpcError::ConfigError(_) | RpcError::TlsError(_) => {
}
_ => panic!("Unexpected error type: {:?}", e),
}
}
}
#[tokio::test]
async fn test_client_keep_alive_configuration_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost")
.with_keep_alive_interval(Duration::from_secs(30));
let result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_call_method_stream_send_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_method_response_timeout() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_method_invalid_response_id() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_method_invalid_response_format() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_method_stream_closed_unexpectedly() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_streaming_method_send_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_streaming_request_send_loop_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_call_streaming_end_frame_send_error() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_streaming_response_parsing_zero_length() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_streaming_response_incomplete_message() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}
#[tokio::test]
async fn test_streaming_response_connection_closed() {
let config = RpcConfig::new("certs/test_cert.pem", "127.0.0.1:0")
.with_key_path("certs/test_key.pem")
.with_server_name("localhost");
let client_result = RpcClient::connect("127.0.0.1:19999".parse().unwrap(), config).await;
assert!(client_result.is_err());
}