use std::ffi::c_void;
use std::ffi::OsStr;
use std::io::{self, Read, Write};
use std::pin::Pin;
use std::ptr;
use std::ptr::null_mut;
use std::task::{Context, Poll};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
cfg_io_util! {
use bytes::BufMut;
}
#[cfg(windows)]
mod doc {
pub(super) use crate::os::windows::ffi::OsStrExt;
pub(super) mod windows_sys {
pub(crate) use windows_sys::{
Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
Win32::System::SystemServices::*,
};
}
pub(super) use mio::windows as mio_windows;
}
#[cfg(not(windows))]
mod doc {
pub(super) mod mio_windows {
pub type NamedPipe = crate::doc::NotDefinedHere;
}
}
use self::doc::*;
#[derive(Debug)]
pub struct NamedPipeServer {
io: PollEvented<mio_windows::NamedPipe>,
}
impl NamedPipeServer {
pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
let named_pipe = unsafe { mio_windows::NamedPipe::from_raw_handle(handle) };
Ok(Self {
io: PollEvented::new(named_pipe)?,
})
}
pub fn info(&self) -> io::Result<PipeInfo> {
unsafe { named_pipe_info(self.io.as_raw_handle()) }
}
pub async fn connect(&self) -> io::Result<()> {
match self.io.connect() {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io
.registration()
.async_io(Interest::WRITABLE, || self.io.connect())
.await
}
x => x,
}
}
pub fn disconnect(&self) -> io::Result<()> {
self.io.disconnect()
}
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
cfg_io_util! {
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
let n = (&*self.io).read(dst)?;
unsafe {
buf.advance_mut(n);
}
Ok(n)
})
}
}
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}
impl AsyncRead for NamedPipeServer {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}
}
impl AsyncWrite for NamedPipeServer {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
impl AsRawHandle for NamedPipeServer {
fn as_raw_handle(&self) -> RawHandle {
self.io.as_raw_handle()
}
}
impl AsHandle for NamedPipeServer {
fn as_handle(&self) -> BorrowedHandle<'_> {
unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
}
}
#[derive(Debug)]
pub struct NamedPipeClient {
io: PollEvented<mio_windows::NamedPipe>,
}
impl NamedPipeClient {
pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
let named_pipe = unsafe { mio_windows::NamedPipe::from_raw_handle(handle) };
Ok(Self {
io: PollEvented::new(named_pipe)?,
})
}
pub fn info(&self) -> io::Result<PipeInfo> {
unsafe { named_pipe_info(self.io.as_raw_handle()) }
}
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
cfg_io_util! {
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
let n = (&*self.io).read(dst)?;
unsafe {
buf.advance_mut(n);
}
Ok(n)
})
}
}
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
pub async fn async_io<R>(
&self,
interest: Interest,
f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().async_io(interest, f).await
}
}
impl AsyncRead for NamedPipeClient {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}
}
impl AsyncWrite for NamedPipeClient {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
impl AsRawHandle for NamedPipeClient {
fn as_raw_handle(&self) -> RawHandle {
self.io.as_raw_handle()
}
}
impl AsHandle for NamedPipeClient {
fn as_handle(&self) -> BorrowedHandle<'_> {
unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
}
}
#[derive(Debug, Clone)]
pub struct ServerOptions {
access_inbound: bool,
access_outbound: bool,
first_pipe_instance: bool,
write_dac: bool,
write_owner: bool,
access_system_security: bool,
pipe_mode: PipeMode,
reject_remote_clients: bool,
max_instances: u32,
out_buffer_size: u32,
in_buffer_size: u32,
default_timeout: u32,
}
impl ServerOptions {
pub fn new() -> ServerOptions {
ServerOptions {
access_inbound: true,
access_outbound: true,
first_pipe_instance: false,
write_dac: false,
write_owner: false,
access_system_security: false,
pipe_mode: PipeMode::Byte,
reject_remote_clients: true,
max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
out_buffer_size: 65536,
in_buffer_size: 65536,
default_timeout: 0,
}
}
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
self.pipe_mode = pipe_mode;
self
}
pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
self.access_inbound = allowed;
self
}
pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
self.access_outbound = allowed;
self
}
pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
self.first_pipe_instance = first;
self
}
pub fn write_dac(&mut self, requested: bool) -> &mut Self {
self.write_dac = requested;
self
}
pub fn write_owner(&mut self, requested: bool) -> &mut Self {
self.write_owner = requested;
self
}
pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
self.access_system_security = requested;
self
}
pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
self.reject_remote_clients = reject;
self
}
#[track_caller]
pub fn max_instances(&mut self, instances: usize) -> &mut Self {
assert!(instances < 255, "cannot specify more than 254 instances");
self.max_instances = instances as u32;
self
}
pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.out_buffer_size = buffer;
self
}
pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.in_buffer_size = buffer;
self
}
pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
}
pub unsafe fn create_with_security_attributes_raw(
&self,
addr: impl AsRef<OsStr>,
attrs: *mut c_void,
) -> io::Result<NamedPipeServer> {
let addr = encode_addr(addr);
let pipe_mode = {
let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
} else {
windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
};
if self.reject_remote_clients {
mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
} else {
mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
}
mode
};
let open_mode = {
let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
if self.access_inbound {
mode |= windows_sys::PIPE_ACCESS_INBOUND;
}
if self.access_outbound {
mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
}
if self.first_pipe_instance {
mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
}
if self.write_dac {
mode |= windows_sys::WRITE_DAC;
}
if self.write_owner {
mode |= windows_sys::WRITE_OWNER;
}
if self.access_system_security {
mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
}
mode
};
let h = unsafe {
windows_sys::CreateNamedPipeW(
addr.as_ptr(),
open_mode,
pipe_mode,
self.max_instances,
self.out_buffer_size,
self.in_buffer_size,
self.default_timeout,
attrs as *mut _,
)
};
if h == windows_sys::INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
unsafe { NamedPipeServer::from_raw_handle(h as _) }
}
}
#[derive(Debug, Clone)]
pub struct ClientOptions {
generic_read: bool,
generic_write: bool,
security_qos_flags: u32,
pipe_mode: PipeMode,
}
impl ClientOptions {
pub fn new() -> Self {
Self {
generic_read: true,
generic_write: true,
security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
| windows_sys::SECURITY_SQOS_PRESENT,
pipe_mode: PipeMode::Byte,
}
}
pub fn read(&mut self, allowed: bool) -> &mut Self {
self.generic_read = allowed;
self
}
pub fn write(&mut self, allowed: bool) -> &mut Self {
self.generic_write = allowed;
self
}
pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
self
}
pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
self.pipe_mode = pipe_mode;
self
}
pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
}
pub unsafe fn open_with_security_attributes_raw(
&self,
addr: impl AsRef<OsStr>,
attrs: *mut c_void,
) -> io::Result<NamedPipeClient> {
let addr = encode_addr(addr);
let desired_access = {
let mut access = 0;
if self.generic_read {
access |= windows_sys::GENERIC_READ;
}
if self.generic_write {
access |= windows_sys::GENERIC_WRITE;
}
access
};
let h = unsafe {
windows_sys::CreateFileW(
addr.as_ptr(),
desired_access,
0,
attrs as *mut _,
windows_sys::OPEN_EXISTING,
self.get_flags(),
null_mut(),
)
};
if h == windows_sys::INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error());
}
if matches!(self.pipe_mode, PipeMode::Message) {
let mode = windows_sys::PIPE_READMODE_MESSAGE;
let result = unsafe {
windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut())
};
if result == 0 {
return Err(io::Error::last_os_error());
}
}
unsafe { NamedPipeClient::from_raw_handle(h as _) }
}
fn get_flags(&self) -> u32 {
self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PipeMode {
Byte,
Message,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PipeEnd {
Client,
Server,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct PipeInfo {
pub mode: PipeMode,
pub end: PipeEnd,
pub max_instances: u32,
pub out_buffer_size: u32,
pub in_buffer_size: u32,
}
fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
let len = addr.as_ref().encode_wide().count();
let mut vec = Vec::with_capacity(len + 1);
vec.extend(addr.as_ref().encode_wide());
vec.push(0);
vec.into_boxed_slice()
}
unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
let mut flags = 0;
let mut out_buffer_size = 0;
let mut in_buffer_size = 0;
let mut max_instances = 0;
let result = unsafe {
windows_sys::GetNamedPipeInfo(
handle as _,
&mut flags,
&mut out_buffer_size,
&mut in_buffer_size,
&mut max_instances,
)
};
if result == 0 {
return Err(io::Error::last_os_error());
}
let mut end = PipeEnd::Client;
let mut mode = PipeMode::Byte;
if flags & windows_sys::PIPE_SERVER_END != 0 {
end = PipeEnd::Server;
}
if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
mode = PipeMode::Message;
}
Ok(PipeInfo {
end,
mode,
out_buffer_size,
in_buffer_size,
max_instances,
})
}