use crate::error::{IpcError, Result};
use std::io::{Read, Write};
pub struct PipeReader {
#[cfg(unix)]
inner: std::os::unix::io::OwnedFd,
#[cfg(windows)]
inner: windows::PipeHandle,
}
pub struct PipeWriter {
#[cfg(unix)]
inner: std::os::unix::io::OwnedFd,
#[cfg(windows)]
inner: windows::PipeHandle,
}
pub struct AnonymousPipe {
reader: PipeReader,
writer: PipeWriter,
}
impl AnonymousPipe {
pub fn new() -> Result<Self> {
#[cfg(unix)]
{
unix::create_anonymous_pipe()
}
#[cfg(windows)]
{
windows::create_anonymous_pipe()
}
}
pub fn split(self) -> (PipeReader, PipeWriter) {
(self.reader, self.writer)
}
pub fn reader(&self) -> &PipeReader {
&self.reader
}
pub fn writer(&self) -> &PipeWriter {
&self.writer
}
pub fn reader_mut(&mut self) -> &mut PipeReader {
&mut self.reader
}
pub fn writer_mut(&mut self) -> &mut PipeWriter {
&mut self.writer
}
}
pub struct NamedPipe {
name: String,
#[cfg(unix)]
inner: unix::UnixPipeInner,
#[cfg(windows)]
inner: windows::PipeHandle,
is_server: bool,
}
impl NamedPipe {
pub fn create(name: &str) -> Result<Self> {
#[cfg(unix)]
{
unix::create_named_pipe(name)
}
#[cfg(windows)]
{
windows::create_named_pipe(name)
}
}
pub fn connect(name: &str) -> Result<Self> {
#[cfg(unix)]
{
unix::connect_named_pipe(name)
}
#[cfg(windows)]
{
windows::connect_named_pipe(name)
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn is_server(&self) -> bool {
self.is_server
}
pub fn wait_for_client(&mut self) -> Result<()> {
if !self.is_server {
return Err(IpcError::InvalidState(
"Only server can wait for clients".into(),
));
}
#[cfg(unix)]
{
unix::wait_for_client(self)
}
#[cfg(windows)]
{
windows::wait_for_client(&self.inner)
}
}
#[cfg(windows)]
pub fn disconnect(&self) -> Result<()> {
if !self.is_server {
return Err(IpcError::InvalidState(
"Only server can disconnect clients".into(),
));
}
windows::disconnect_named_pipe(&self.inner)
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for PipeReader {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
std::os::unix::io::AsRawFd::as_raw_fd(&self.inner)
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for PipeWriter {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
std::os::unix::io::AsRawFd::as_raw_fd(&self.inner)
}
}
impl Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let fd = self.inner.as_raw_fd();
let ret = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if ret < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
#[cfg(windows)]
{
windows::read_pipe(&self.inner, buf)
}
}
}
impl Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let fd = self.inner.as_raw_fd();
let ret = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
if ret < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
#[cfg(windows)]
{
windows::write_pipe(&self.inner, buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
#[cfg(unix)]
{
unix::read_pipe(self, buf)
}
#[cfg(windows)]
{
windows::read_pipe(&self.inner, buf)
}
}
}
impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
#[cfg(unix)]
{
unix::write_pipe(self, buf)
}
#[cfg(windows)]
{
windows::write_pipe(&self.inner, buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
#[cfg(unix)]
{
unix::flush_pipe(self)
}
#[cfg(windows)]
{
Ok(())
}
}
}
#[cfg(unix)]
mod unix {
use super::*;
use std::os::unix::io::{FromRawFd, OwnedFd};
use std::os::unix::net::{UnixListener, UnixStream};
pub enum UnixPipeInner {
Listener {
listener: UnixListener,
path: String,
},
Connected(UnixStream),
}
impl UnixPipeInner {
pub fn as_stream_mut(&mut self) -> Option<&mut UnixStream> {
match self {
UnixPipeInner::Connected(stream) => Some(stream),
_ => None,
}
}
}
pub fn create_anonymous_pipe() -> Result<AnonymousPipe> {
let mut fds = [0i32; 2];
let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
if ret < 0 {
return Err(IpcError::Io(std::io::Error::last_os_error()));
}
let reader = PipeReader {
inner: unsafe { OwnedFd::from_raw_fd(fds[0]) },
};
let writer = PipeWriter {
inner: unsafe { OwnedFd::from_raw_fd(fds[1]) },
};
Ok(AnonymousPipe { reader, writer })
}
pub fn create_named_pipe(name: &str) -> Result<NamedPipe> {
let path = if name.starts_with('/') {
name.to_string()
} else {
format!("/tmp/{}.sock", name)
};
let _ = std::fs::remove_file(&path);
let listener = UnixListener::bind(&path).map_err(|e| match e.kind() {
std::io::ErrorKind::PermissionDenied => IpcError::PermissionDenied(path.clone()),
_ => IpcError::Io(e),
})?;
Ok(NamedPipe {
name: path.clone(),
inner: UnixPipeInner::Listener { listener, path },
is_server: true,
})
}
pub fn connect_named_pipe(name: &str) -> Result<NamedPipe> {
let path = if name.starts_with('/') {
name.to_string()
} else {
format!("/tmp/{}.sock", name)
};
let stream = UnixStream::connect(&path).map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => IpcError::NotFound(path.clone()),
std::io::ErrorKind::PermissionDenied => IpcError::PermissionDenied(path.clone()),
std::io::ErrorKind::ConnectionRefused => {
IpcError::NotFound(format!("Connection refused: {}", path))
}
_ => IpcError::Io(e),
})?;
Ok(NamedPipe {
name: path,
inner: UnixPipeInner::Connected(stream),
is_server: false,
})
}
pub fn wait_for_client(pipe: &mut NamedPipe) -> Result<()> {
match &pipe.inner {
UnixPipeInner::Listener { listener, path: _ } => {
let (stream, _) = listener.accept()?;
pipe.inner = UnixPipeInner::Connected(stream);
Ok(())
}
UnixPipeInner::Connected(_) => {
Ok(())
}
}
}
pub fn read_pipe(pipe: &mut NamedPipe, buf: &mut [u8]) -> std::io::Result<usize> {
match pipe.inner.as_stream_mut() {
Some(stream) => stream.read(buf),
None => Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Pipe not connected",
)),
}
}
pub fn write_pipe(pipe: &mut NamedPipe, buf: &[u8]) -> std::io::Result<usize> {
match pipe.inner.as_stream_mut() {
Some(stream) => stream.write(buf),
None => Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Pipe not connected",
)),
}
}
pub fn flush_pipe(pipe: &mut NamedPipe) -> std::io::Result<()> {
match pipe.inner.as_stream_mut() {
Some(stream) => stream.flush(),
None => Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Pipe not connected",
)),
}
}
impl Drop for UnixPipeInner {
fn drop(&mut self) {
if let UnixPipeInner::Listener { path, .. } = self {
let _ = std::fs::remove_file(path);
}
}
}
}
#[cfg(windows)]
mod windows {
use super::*;
use std::ffi::OsStr;
use std::os::windows::ffi::OsStrExt;
use std::ptr;
use windows_sys::Win32::Foundation::*;
use windows_sys::Win32::Storage::FileSystem::*;
use windows_sys::Win32::System::Pipes::*;
pub struct PipeHandle {
handle: HANDLE,
}
impl PipeHandle {
pub fn new(handle: HANDLE) -> Self {
Self { handle }
}
pub fn as_raw(&self) -> HANDLE {
self.handle
}
}
impl Drop for PipeHandle {
fn drop(&mut self) {
if self.handle != INVALID_HANDLE_VALUE {
unsafe { CloseHandle(self.handle) };
}
}
}
unsafe impl Send for PipeHandle {}
unsafe impl Sync for PipeHandle {}
fn to_wide(s: &str) -> Vec<u16> {
OsStr::new(s).encode_wide().chain(Some(0)).collect()
}
pub fn create_anonymous_pipe() -> Result<AnonymousPipe> {
let mut read_handle: HANDLE = INVALID_HANDLE_VALUE;
let mut write_handle: HANDLE = INVALID_HANDLE_VALUE;
let ret = unsafe { CreatePipe(&mut read_handle, &mut write_handle, ptr::null(), 0) };
if ret == 0 {
return Err(IpcError::Io(std::io::Error::last_os_error()));
}
Ok(AnonymousPipe {
reader: PipeReader {
inner: PipeHandle::new(read_handle),
},
writer: PipeWriter {
inner: PipeHandle::new(write_handle),
},
})
}
pub fn create_named_pipe(name: &str) -> Result<NamedPipe> {
let pipe_name = if name.starts_with(r"\\.\pipe\") {
name.to_string()
} else {
format!(r"\\.\pipe\{}", name)
};
let wide_name = to_wide(&pipe_name);
let handle = unsafe {
CreateNamedPipeW(
wide_name.as_ptr(),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
4096,
4096,
0,
ptr::null(),
)
};
if handle == INVALID_HANDLE_VALUE {
return Err(IpcError::Io(std::io::Error::last_os_error()));
}
Ok(NamedPipe {
name: pipe_name,
inner: PipeHandle::new(handle),
is_server: true,
})
}
pub fn connect_named_pipe(name: &str) -> Result<NamedPipe> {
let pipe_name = if name.starts_with(r"\\.\pipe\") {
name.to_string()
} else {
format!(r"\\.\pipe\{}", name)
};
let wide_name = to_wide(&pipe_name);
let handle = unsafe {
CreateFileW(
wide_name.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
0,
ptr::null(),
OPEN_EXISTING,
0,
INVALID_HANDLE_VALUE,
)
};
if handle == INVALID_HANDLE_VALUE {
let err = std::io::Error::last_os_error();
return Err(match err.raw_os_error() {
Some(2) => IpcError::NotFound(pipe_name), Some(5) => IpcError::PermissionDenied(pipe_name), _ => IpcError::Io(err),
});
}
Ok(NamedPipe {
name: pipe_name,
inner: PipeHandle::new(handle),
is_server: false,
})
}
pub fn wait_for_client(handle: &PipeHandle) -> Result<()> {
let ret = unsafe { ConnectNamedPipe(handle.as_raw(), ptr::null_mut()) };
if ret == 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(535) {
return Err(IpcError::Io(err));
}
}
Ok(())
}
pub fn disconnect_named_pipe(handle: &PipeHandle) -> Result<()> {
let ret = unsafe { DisconnectNamedPipe(handle.as_raw()) };
if ret == 0 {
return Err(IpcError::Io(std::io::Error::last_os_error()));
}
Ok(())
}
pub fn read_pipe(handle: &PipeHandle, buf: &mut [u8]) -> std::io::Result<usize> {
let mut bytes_read: u32 = 0;
let ret = unsafe {
ReadFile(
handle.as_raw(),
buf.as_mut_ptr() as *mut _,
buf.len() as u32,
&mut bytes_read,
ptr::null_mut(),
)
};
if ret == 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(bytes_read as usize)
}
}
pub fn write_pipe(handle: &PipeHandle, buf: &[u8]) -> std::io::Result<usize> {
let mut bytes_written: u32 = 0;
let ret = unsafe {
WriteFile(
handle.as_raw(),
buf.as_ptr() as *const _,
buf.len() as u32,
&mut bytes_written,
ptr::null_mut(),
)
};
if ret == 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(bytes_written as usize)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_anonymous_pipe() {
let pipe = AnonymousPipe::new().unwrap();
let (mut reader, mut writer) = pipe.split();
let msg = b"Hello, IPC!";
writer.write_all(msg).unwrap();
let mut buf = [0u8; 32];
let n = reader.read(&mut buf).unwrap();
assert_eq!(&buf[..n], msg);
}
}