#![cfg(feature = "full")]
#![cfg(all(windows))]
use std::io;
use std::mem;
use std::os::windows::io::AsRawHandle;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
use tokio::time;
use winapi::shared::winerror;
#[tokio::test]
async fn test_named_pipe_client_drop() -> io::Result<()> {
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop";
let mut server = ServerOptions::new().create(PIPE_NAME)?;
assert_eq!(num_instances("test-named-pipe-client-drop")?, 1);
let client = ClientOptions::new().open(PIPE_NAME)?;
server.connect().await?;
drop(client);
match server.write_all(b"ping").await {
Err(e) if e.raw_os_error() == Some(winerror::ERROR_NO_DATA as i32) => (),
x => panic!("{:?}", x),
}
Ok(())
}
#[tokio::test]
async fn test_named_pipe_single_client() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt as _, BufReader};
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client";
let server = ServerOptions::new().create(PIPE_NAME)?;
let server = tokio::spawn(async move {
server.connect().await?;
let mut server = BufReader::new(server);
let mut buf = String::new();
server.read_line(&mut buf).await?;
server.write_all(b"pong\n").await?;
Ok::<_, io::Error>(buf)
});
let client = tokio::spawn(async move {
let client = ClientOptions::new().open(PIPE_NAME)?;
let mut client = BufReader::new(client);
let mut buf = String::new();
client.write_all(b"ping\n").await?;
client.read_line(&mut buf).await?;
Ok::<_, io::Error>(buf)
});
let (server, client) = tokio::try_join!(server, client)?;
assert_eq!(server?, "ping\n");
assert_eq!(client?, "pong\n");
Ok(())
}
#[tokio::test]
async fn test_named_pipe_multi_client() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt as _, BufReader};
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client";
const N: usize = 10;
let mut server = ServerOptions::new().create(PIPE_NAME)?;
let server = tokio::spawn(async move {
for _ in 0..N {
server.connect().await?;
let mut inner = BufReader::new(server);
server = ServerOptions::new().create(PIPE_NAME)?;
let _ = tokio::spawn(async move {
let mut buf = String::new();
inner.read_line(&mut buf).await?;
inner.write_all(b"pong\n").await?;
inner.flush().await?;
Ok::<_, io::Error>(())
});
}
Ok::<_, io::Error>(())
});
let mut clients = Vec::new();
for _ in 0..N {
clients.push(tokio::spawn(async move {
let client = loop {
match ClientOptions::new().open(PIPE_NAME) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
Err(e) if e.kind() == io::ErrorKind::NotFound => (),
Err(e) => return Err(e),
}
time::sleep(Duration::from_millis(50)).await;
};
let mut client = BufReader::new(client);
let mut buf = String::new();
client.write_all(b"ping\n").await?;
client.flush().await?;
client.read_line(&mut buf).await?;
Ok::<_, io::Error>(buf)
}));
}
for client in clients {
let result = client.await?;
assert_eq!(result?, "pong\n");
}
server.await??;
Ok(())
}
#[tokio::test]
async fn test_named_pipe_mode_message() -> io::Result<()> {
const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message";
let server = ServerOptions::new()
.pipe_mode(PipeMode::Message)
.create(PIPE_NAME)?;
let _ = ClientOptions::new().open(PIPE_NAME)?;
server.connect().await?;
Ok(())
}
fn num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32> {
use ntapi::ntioapi;
use winapi::shared::ntdef;
let mut name = pipe_name.as_ref().encode_utf16().collect::<Vec<_>>();
let mut name = ntdef::UNICODE_STRING {
Length: (name.len() * mem::size_of::<u16>()) as u16,
MaximumLength: (name.len() * mem::size_of::<u16>()) as u16,
Buffer: name.as_mut_ptr(),
};
let root = std::fs::File::open(r"\\.\Pipe\")?;
let mut io_status_block = unsafe { mem::zeroed() };
let mut file_directory_information = [0_u8; 1024];
let status = unsafe {
ntioapi::NtQueryDirectoryFile(
root.as_raw_handle(),
std::ptr::null_mut(),
None,
std::ptr::null_mut(),
&mut io_status_block,
&mut file_directory_information as *mut _ as *mut _,
1024,
ntioapi::FileDirectoryInformation,
0,
&mut name,
0,
)
};
if status as u32 != winerror::NO_ERROR {
return Err(io::Error::last_os_error());
}
let info = unsafe {
mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information)
};
let raw_name = unsafe {
std::slice::from_raw_parts(
info.FileName.as_ptr(),
info.FileNameLength as usize / mem::size_of::<u16>(),
)
};
let name = String::from_utf16(raw_name).unwrap();
let num_instances = unsafe { *info.EndOfFile.QuadPart() };
assert_eq!(name, pipe_name.as_ref());
Ok(num_instances as u32)
}