#![allow(clippy::expect_used, clippy::unwrap_used)]
#[cfg(all(windows, not(feature = "use-tcp-on-windows")))]
mod windows_pipe_tests {
use futures::{SinkExt, StreamExt};
use network_protocol::config::PROTOCOL_VERSION;
use network_protocol::core::packet::Packet;
use network_protocol::transport::windows_pipe;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[tokio::test]
async fn test_named_pipe_connection() {
let pipe_name = "\\\\.\\pipe\\test_connection";
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let server_pipe = pipe_name.to_string();
let server_handle = tokio::spawn(async move {
windows_pipe::start_server_with_shutdown(&server_pipe, shutdown_rx)
.await
.expect("Server failed");
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut client = timeout(Duration::from_secs(5), windows_pipe::connect(pipe_name))
.await
.expect("Connection timeout")
.expect("Failed to connect");
let test_packet = Packet {
version: PROTOCOL_VERSION,
payload: b"Hello, Windows Pipes!".to_vec(),
};
client
.send(test_packet.clone())
.await
.expect("Failed to send packet");
let response = timeout(Duration::from_secs(5), client.next())
.await
.expect("Response timeout")
.expect("Stream ended")
.expect("Failed to receive packet");
assert_eq!(response.payload, test_packet.payload);
drop(client);
tokio::time::sleep(Duration::from_millis(200)).await;
shutdown_tx.send(()).await.expect("Failed to send shutdown");
timeout(Duration::from_secs(5), server_handle)
.await
.expect("Server shutdown timeout")
.expect("Server task panicked");
}
#[tokio::test]
async fn test_multiple_connections() {
let pipe_name = "\\\\.\\pipe\\test_multiple";
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let server_pipe = pipe_name.to_string();
let server_handle = tokio::spawn(async move {
windows_pipe::start_server_with_shutdown(&server_pipe, shutdown_rx)
.await
.expect("Server failed");
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut clients = Vec::new();
for _ in 0..3 {
let client = windows_pipe::connect(pipe_name)
.await
.expect("Failed to connect");
clients.push(client);
tokio::time::sleep(Duration::from_millis(50)).await;
}
for (i, client) in clients.iter_mut().enumerate() {
let test_packet = Packet {
version: PROTOCOL_VERSION,
payload: format!("Client {}", i).into_bytes(),
};
client
.send(test_packet.clone())
.await
.expect("Failed to send");
let response = timeout(Duration::from_secs(5), client.next())
.await
.expect("Timeout")
.expect("Stream ended")
.expect("Failed to receive");
assert_eq!(response.payload, test_packet.payload);
}
drop(clients);
tokio::time::sleep(Duration::from_millis(200)).await;
shutdown_tx.send(()).await.expect("Failed to shutdown");
timeout(Duration::from_secs(5), server_handle)
.await
.expect("Server shutdown timeout")
.expect("Server task panicked");
}
#[tokio::test]
async fn test_large_packet() {
let pipe_name = "\\\\.\\pipe\\test_large";
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let server_pipe = pipe_name.to_string();
let server_handle = tokio::spawn(async move {
windows_pipe::start_server_with_shutdown(&server_pipe, shutdown_rx)
.await
.expect("Server failed");
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut client = windows_pipe::connect(pipe_name)
.await
.expect("Failed to connect");
let large_payload = vec![0xAB; 1024 * 1024];
let test_packet = Packet {
version: PROTOCOL_VERSION,
payload: large_payload.clone(),
};
client
.send(test_packet.clone())
.await
.expect("Failed to send large packet");
let response = timeout(Duration::from_secs(10), client.next())
.await
.expect("Timeout")
.expect("Stream ended")
.expect("Failed to receive");
assert_eq!(response.payload.len(), large_payload.len());
assert_eq!(response.payload, large_payload);
drop(client);
tokio::time::sleep(Duration::from_millis(200)).await;
shutdown_tx.send(()).await.expect("Failed to shutdown");
timeout(Duration::from_secs(5), server_handle)
.await
.expect("Server shutdown timeout")
.expect("Server task panicked");
}
#[tokio::test]
async fn test_graceful_shutdown() {
let pipe_name = "\\\\.\\pipe\\test_shutdown";
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let server_pipe = pipe_name.to_string();
let server_handle = tokio::spawn(async move {
windows_pipe::start_server_with_shutdown(&server_pipe, shutdown_rx)
.await
.expect("Server failed");
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut client = windows_pipe::connect(pipe_name)
.await
.expect("Failed to connect");
let test_packet = Packet {
version: PROTOCOL_VERSION,
payload: b"test".to_vec(),
};
client.send(test_packet).await.expect("Failed to send");
shutdown_tx.send(()).await.expect("Failed to shutdown");
timeout(Duration::from_secs(15), server_handle)
.await
.expect("Server shutdown timeout")
.expect("Server task panicked");
}
}
#[cfg(not(windows))]
#[test]
fn windows_tests_skipped() {
println!("Windows Named Pipes tests are only available on Windows");
}