#[cfg(doc)]
use std::ptr::null_mut;
use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};
use compio_buf::{BufResult, IoBuf, IoBufMut};
use compio_driver::{impl_raw_fd, op::ConnectNamedPipe, syscall, AsRawFd, RawFd, ToSharedFd};
use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
use widestring::U16CString;
use windows_sys::Win32::{
Storage::FileSystem::{
FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_INBOUND,
PIPE_ACCESS_OUTBOUND, WRITE_DAC, WRITE_OWNER,
},
System::{
Pipes::{
CreateNamedPipeW, DisconnectNamedPipe, GetNamedPipeInfo, PIPE_ACCEPT_REMOTE_CLIENTS,
PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE, PIPE_REJECT_REMOTE_CLIENTS, PIPE_SERVER_END,
PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES,
},
SystemServices::ACCESS_SYSTEM_SECURITY,
},
};
use crate::{AsyncFd, File, OpenOptions};
#[derive(Debug, Clone)]
pub struct NamedPipeServer {
handle: AsyncFd<std::fs::File>,
}
impl NamedPipeServer {
pub fn info(&self) -> io::Result<PipeInfo> {
unsafe { named_pipe_info(self.as_raw_fd()) }
}
pub async fn connect(&self) -> io::Result<()> {
let op = ConnectNamedPipe::new(self.handle.to_shared_fd());
compio_runtime::submit(op).await.0?;
Ok(())
}
pub fn disconnect(&self) -> io::Result<()> {
syscall!(BOOL, DisconnectNamedPipe(self.as_raw_fd() as _))?;
Ok(())
}
}
impl AsyncRead for NamedPipeServer {
#[inline]
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
(&*self).read(buf).await
}
}
impl AsyncRead for &NamedPipeServer {
#[inline]
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
(&self.handle).read(buffer).await
}
}
impl AsyncWrite for NamedPipeServer {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write(buf).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
(&*self).flush().await
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
(&*self).shutdown().await
}
}
impl AsyncWrite for &NamedPipeServer {
#[inline]
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
(&self.handle).write(buffer).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
Ok(())
}
}
impl_raw_fd!(NamedPipeServer, std::fs::File, handle, file);
#[derive(Debug, Clone)]
pub struct NamedPipeClient {
handle: File,
}
impl NamedPipeClient {
pub fn info(&self) -> io::Result<PipeInfo> {
unsafe { named_pipe_info(self.as_raw_fd()) }
}
}
impl AsyncRead for NamedPipeClient {
#[inline]
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
(&*self).read(buf).await
}
}
impl AsyncRead for &NamedPipeClient {
#[inline]
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
self.handle.read_at(buffer, 0).await
}
}
impl AsyncWrite for NamedPipeClient {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
(&*self).write(buf).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
(&*self).flush().await
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
(&*self).shutdown().await
}
}
impl AsyncWrite for &NamedPipeClient {
#[inline]
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
(&self.handle).write_at(buffer, 0).await
}
#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}
#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
Ok(())
}
}
impl_raw_fd!(NamedPipeClient, std::fs::File, handle, file);
#[derive(Debug, Clone)]
pub struct ServerOptions {
access_inbound: bool,
access_outbound: bool,
first_pipe_instance: bool,
write_dac: bool,
write_owner: bool,
access_system_security: bool,
pipe_mode: PipeMode,
reject_remote_clients: bool,
max_instances: u32,
out_buffer_size: u32,
in_buffer_size: u32,
default_timeout: u32,
}
impl ServerOptions {
pub fn new() -> ServerOptions {
ServerOptions {
access_inbound: true,
access_outbound: true,
first_pipe_instance: false,
write_dac: false,
write_owner: false,
access_system_security: false,
pipe_mode: PipeMode::Byte,
reject_remote_clients: true,
max_instances: PIPE_UNLIMITED_INSTANCES,
out_buffer_size: 65536,
in_buffer_size: 65536,
default_timeout: 0,
}
}
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
self.pipe_mode = pipe_mode;
self
}
pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
self.access_inbound = allowed;
self
}
pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
self.access_outbound = allowed;
self
}
pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
self.first_pipe_instance = first;
self
}
pub fn write_dac(&mut self, requested: bool) -> &mut Self {
self.write_dac = requested;
self
}
pub fn write_owner(&mut self, requested: bool) -> &mut Self {
self.write_owner = requested;
self
}
pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
self.access_system_security = requested;
self
}
pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
self.reject_remote_clients = reject;
self
}
#[track_caller]
pub fn max_instances(&mut self, instances: usize) -> &mut Self {
assert!(instances < 255, "cannot specify more than 254 instances");
self.max_instances = instances as u32;
self
}
pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.out_buffer_size = buffer;
self
}
pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.in_buffer_size = buffer;
self
}
pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
let addr = U16CString::from_os_str(addr)
.map_err(|e| io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let pipe_mode = {
let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE
} else {
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE
};
if self.reject_remote_clients {
mode |= PIPE_REJECT_REMOTE_CLIENTS;
} else {
mode |= PIPE_ACCEPT_REMOTE_CLIENTS;
}
mode
};
let open_mode = {
let mut mode = FILE_FLAG_OVERLAPPED;
if self.access_inbound {
mode |= PIPE_ACCESS_INBOUND;
}
if self.access_outbound {
mode |= PIPE_ACCESS_OUTBOUND;
}
if self.first_pipe_instance {
mode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
}
if self.write_dac {
mode |= WRITE_DAC;
}
if self.write_owner {
mode |= WRITE_OWNER;
}
if self.access_system_security {
mode |= ACCESS_SYSTEM_SECURITY;
}
mode
};
let h = syscall!(
HANDLE,
CreateNamedPipeW(
addr.as_ptr(),
open_mode,
pipe_mode,
self.max_instances,
self.out_buffer_size,
self.in_buffer_size,
self.default_timeout,
null(),
)
)?;
Ok(NamedPipeServer {
handle: AsyncFd::new(unsafe { std::fs::File::from_raw_handle(h as _) })?,
})
}
}
impl Default for ServerOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct ClientOptions {
options: OpenOptions,
pipe_mode: PipeMode,
}
impl ClientOptions {
pub fn new() -> Self {
use windows_sys::Win32::Storage::FileSystem::SECURITY_IDENTIFICATION;
let mut options = OpenOptions::new();
options
.read(true)
.write(true)
.security_qos_flags(SECURITY_IDENTIFICATION);
Self {
options,
pipe_mode: PipeMode::Byte,
}
}
pub fn read(&mut self, allowed: bool) -> &mut Self {
self.options.read(allowed);
self
}
pub fn write(&mut self, allowed: bool) -> &mut Self {
self.options.write(allowed);
self
}
pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
self.options.security_qos_flags(flags);
self
}
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
self.pipe_mode = pipe_mode;
self
}
pub async fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
use windows_sys::Win32::System::Pipes::SetNamedPipeHandleState;
let file = self.options.open(addr.as_ref()).await?;
if matches!(self.pipe_mode, PipeMode::Message) {
let mode = PIPE_READMODE_MESSAGE;
syscall!(
BOOL,
SetNamedPipeHandleState(file.as_raw_fd() as _, &mode, null(), null())
)?;
}
Ok(NamedPipeClient { handle: file })
}
}
impl Default for ClientOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PipeMode {
Byte,
Message,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PipeEnd {
Client,
Server,
}
#[derive(Debug)]
pub struct PipeInfo {
pub mode: PipeMode,
pub end: PipeEnd,
pub max_instances: u32,
pub out_buffer_size: u32,
pub in_buffer_size: u32,
}
unsafe fn named_pipe_info(handle: RawFd) -> io::Result<PipeInfo> {
let mut flags = 0;
let mut out_buffer_size = 0;
let mut in_buffer_size = 0;
let mut max_instances = 0;
syscall!(
BOOL,
GetNamedPipeInfo(
handle as _,
&mut flags,
&mut out_buffer_size,
&mut in_buffer_size,
&mut max_instances,
)
)?;
let mut end = PipeEnd::Client;
let mut mode = PipeMode::Byte;
if flags & PIPE_SERVER_END != 0 {
end = PipeEnd::Server;
}
if flags & PIPE_TYPE_MESSAGE != 0 {
mode = PipeMode::Message;
}
Ok(PipeInfo {
end,
mode,
out_buffer_size,
in_buffer_size,
max_instances,
})
}