use crate::ipc::IpcMessage;
use postcard;
use serde_core;
use thiserror::Error;
use std::{
cell::RefCell,
cmp::PartialEq,
convert::TryInto,
env,
ffi::CString,
fmt, io,
marker::{Send, Sync},
mem,
ops::{Deref, DerefMut, RangeFrom},
ptr, slice,
sync::LazyLock,
thread,
time::Duration,
};
use uuid::Uuid;
use windows::{
core::{Error as WinError, HRESULT, PCSTR},
Win32::{
Foundation::{
CloseHandle, CompareObjectHandles, DuplicateHandle, GetLastError,
DUPLICATE_CLOSE_SOURCE, DUPLICATE_HANDLE_OPTIONS, DUPLICATE_SAME_ACCESS,
ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NOT_FOUND,
ERROR_NO_DATA, ERROR_PIPE_CONNECTED, HANDLE, INVALID_HANDLE_VALUE, WAIT_TIMEOUT,
},
Storage::FileSystem::{
CreateFileA, ReadFile, WriteFile, FILE_ATTRIBUTE_NORMAL, FILE_FLAG_OVERLAPPED,
FILE_GENERIC_READ, FILE_GENERIC_WRITE, FILE_SHARE_MODE, OPEN_EXISTING,
PIPE_ACCESS_DUPLEX,
},
System::{
Memory::{
CreateFileMappingA, MapViewOfFile, UnmapViewOfFile, FILE_MAP_ALL_ACCESS,
MEMORY_MAPPED_VIEW_ADDRESS, PAGE_READWRITE, SEC_COMMIT,
},
Pipes::{
ConnectNamedPipe, CreateNamedPipeA, GetNamedPipeServerProcessId,
PIPE_READMODE_BYTE, PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE,
},
Threading::{
CreateEventA, GetCurrentProcess, OpenProcess, ResetEvent, INFINITE,
PROCESS_DUP_HANDLE,
},
IO::{
CancelIoEx, CreateIoCompletionPort, GetOverlappedResult, GetOverlappedResultEx,
GetQueuedCompletionStatus, OVERLAPPED,
},
},
},
};
mod aliased_cell;
use self::aliased_cell::AliasedCell;
#[cfg(test)]
mod tests;
#[derive(Debug, Error)]
pub enum OsTrySelectError {
#[error("Error in IO: {0}.")]
IoError(#[from] WinIpcError),
#[error("No messages were received and no disconnections occurred.")]
Empty,
}
static CURRENT_PROCESS_ID: LazyLock<u32> = LazyLock::new(std::process::id);
static CURRENT_PROCESS_HANDLE: LazyLock<WinHandle> =
LazyLock::new(|| WinHandle::new(unsafe { GetCurrentProcess() }));
struct NoDebug<T>(T);
impl<T> Deref for NoDebug<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> DerefMut for NoDebug<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T> fmt::Debug for NoDebug<T> {
fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}
static DEBUG_TRACE_ENABLED: LazyLock<bool> =
LazyLock::new(|| env::var_os("IPC_CHANNEL_WIN_DEBUG_TRACE").is_some());
macro_rules! win32_trace {
($($rest:tt)*) => {
if cfg!(feature = "win32-trace") {
if *DEBUG_TRACE_ENABLED { println!($($rest)*); }
}
}
}
const MAX_FRAGMENT_SIZE: usize = 64 * 1024;
const PIPE_BUFFER_SIZE: usize = MAX_FRAGMENT_SIZE + 4 * 1024;
pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), WinError> {
let pipe_id = make_pipe_id();
let pipe_name = make_pipe_name(&pipe_id);
let receiver = OsIpcReceiver::new_named(&pipe_name)?;
let sender = OsIpcSender::connect_named(&pipe_name)?;
Ok((sender, receiver))
}
unsafe fn create_duplex(pipe_name: &CString) -> Result<HANDLE, WinError> {
CreateFileA(
PCSTR::from_raw(pipe_name.as_ptr() as *const u8),
FILE_GENERIC_WRITE.0 | FILE_GENERIC_READ.0,
FILE_SHARE_MODE(0),
None, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
)
.or_else(|_| {
CreateNamedPipeA(
PCSTR::from_raw(pipe_name.as_ptr() as *const u8),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_REJECT_REMOTE_CLIENTS,
1,
0,
PIPE_BUFFER_SIZE as u32,
0, None,
)
})
}
struct MessageHeader {
data_len: u32,
oob_len: u32,
}
impl MessageHeader {
fn total_message_bytes_needed(&self) -> usize {
mem::size_of::<MessageHeader>() + self.data_len as usize + self.oob_len as usize
}
}
struct Message<'data> {
data_len: usize,
oob_len: usize,
bytes: &'data [u8],
}
impl<'data> Message<'data> {
fn from_bytes(bytes: &'data [u8]) -> Option<Message<'data>> {
if bytes.len() < mem::size_of::<MessageHeader>() {
return None;
}
unsafe {
let header = &(*(bytes.as_ptr() as *const MessageHeader));
if bytes.len() < header.total_message_bytes_needed() {
return None;
}
Some(Message {
data_len: header.data_len as usize,
oob_len: header.oob_len as usize,
bytes: &bytes[0..header.total_message_bytes_needed()],
})
}
}
fn data(&self) -> &[u8] {
&self.bytes
[mem::size_of::<MessageHeader>()..(mem::size_of::<MessageHeader>() + self.data_len)]
}
fn oob_bytes(&self) -> &[u8] {
&self.bytes[(mem::size_of::<MessageHeader>() + self.data_len)..]
}
fn oob_data(&self) -> Option<OutOfBandMessage> {
if self.oob_len > 0 {
let mut oob = postcard::from_bytes::<OutOfBandMessage>(self.oob_bytes())
.expect("Failed to deserialize OOB data");
if let Err(e) = oob.recover_handles() {
win32_trace!("Failed to recover handles: {:?}", e);
return None;
}
Some(oob)
} else {
None
}
}
fn size(&self) -> usize {
mem::size_of::<MessageHeader>() + self.data_len + self.oob_len
}
}
#[derive(Debug)]
struct OutOfBandMessage {
target_process_id: u32,
channel_handles: Vec<isize>,
shmem_handles: Vec<(isize, u64)>, big_data_receiver_handle: Option<(isize, u64)>, }
impl OutOfBandMessage {
fn new(target_id: u32) -> OutOfBandMessage {
OutOfBandMessage {
target_process_id: target_id,
channel_handles: vec![],
shmem_handles: vec![],
big_data_receiver_handle: None,
}
}
fn needs_to_be_sent(&self) -> bool {
!self.channel_handles.is_empty()
|| !self.shmem_handles.is_empty()
|| self.big_data_receiver_handle.is_some()
}
fn recover_handles(&mut self) -> Result<(), WinError> {
let current_process = unsafe { GetCurrentProcess() };
let target_process =
unsafe { OpenProcess(PROCESS_DUP_HANDLE, false, self.target_process_id)? };
for handle in &mut self.channel_handles {
let mut new_handle = INVALID_HANDLE_VALUE;
unsafe {
DuplicateHandle(
target_process,
HANDLE(*handle as _),
current_process,
&mut new_handle,
0,
false,
DUPLICATE_SAME_ACCESS,
)?;
}
*handle = new_handle.0 as isize;
}
for (handle, _) in &mut self.shmem_handles {
let mut new_handle = INVALID_HANDLE_VALUE;
unsafe {
DuplicateHandle(
target_process,
HANDLE(*handle as _),
current_process,
&mut new_handle,
0,
false,
DUPLICATE_SAME_ACCESS,
)?;
}
*handle = new_handle.0 as isize;
}
if let Some((handle, _)) = &mut self.big_data_receiver_handle {
let mut new_handle = INVALID_HANDLE_VALUE;
unsafe {
DuplicateHandle(
target_process,
HANDLE(*handle as _),
current_process,
&mut new_handle,
0,
false,
DUPLICATE_SAME_ACCESS,
)?;
}
*handle = new_handle.0 as isize;
}
unsafe { CloseHandle(target_process)? };
Ok(())
}
}
impl serde_core::Serialize for OutOfBandMessage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde_core::Serializer,
{
(
self.target_process_id,
&self.channel_handles,
&self.shmem_handles,
&self.big_data_receiver_handle,
)
.serialize(serializer)
}
}
impl<'de> serde_core::Deserialize<'de> for OutOfBandMessage {
fn deserialize<D>(deserializer: D) -> Result<OutOfBandMessage, D::Error>
where
D: serde_core::Deserializer<'de>,
{
let (target_process_id, channel_handles, shmem_handles, big_data_receiver_handle) =
serde_core::Deserialize::deserialize(deserializer)?;
Ok(OutOfBandMessage {
target_process_id,
channel_handles,
shmem_handles,
big_data_receiver_handle,
})
}
}
fn make_pipe_id() -> Uuid {
Uuid::new_v4()
}
fn make_pipe_name(pipe_id: &Uuid) -> CString {
CString::new(format!("\\\\.\\pipe\\rust-ipc-{}", pipe_id)).unwrap()
}
fn dup_handle_to_process_with_flags(
handle: &WinHandle,
other_process: &WinHandle,
flags: DUPLICATE_HANDLE_OPTIONS,
) -> Result<WinHandle, WinError> {
if !handle.is_valid() {
return Ok(WinHandle::invalid());
}
unsafe {
let mut new_handle: HANDLE = INVALID_HANDLE_VALUE;
DuplicateHandle(
CURRENT_PROCESS_HANDLE.as_raw(),
handle.as_raw(),
other_process.as_raw(),
&mut new_handle,
0,
false,
flags,
)
.map(|()| WinHandle::new(new_handle))
}
}
fn dup_handle(handle: &WinHandle) -> Result<WinHandle, WinError> {
dup_handle_to_process(handle, &WinHandle::new(CURRENT_PROCESS_HANDLE.as_raw()))
}
fn dup_handle_to_process(
handle: &WinHandle,
other_process: &WinHandle,
) -> Result<WinHandle, WinError> {
dup_handle_to_process_with_flags(handle, other_process, DUPLICATE_SAME_ACCESS)
}
fn move_handle_to_process(
handle: WinHandle,
other_process: &WinHandle,
) -> Result<WinHandle, WinError> {
let result = dup_handle_to_process_with_flags(
&handle,
other_process,
DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS,
);
mem::forget(handle);
result
}
#[derive(Debug)]
struct WinHandle {
handle: HANDLE,
}
unsafe impl Send for WinHandle {}
unsafe impl Sync for WinHandle {}
impl Drop for WinHandle {
fn drop(&mut self) {
unsafe {
if self.is_valid() {
let result = CloseHandle(self.handle);
assert!(result.is_ok() || thread::panicking());
}
}
}
}
impl Default for WinHandle {
fn default() -> WinHandle {
WinHandle {
handle: INVALID_HANDLE_VALUE,
}
}
}
impl PartialEq for WinHandle {
fn eq(&self, other: &WinHandle) -> bool {
unsafe { CompareObjectHandles(self.handle, other.handle).into() }
}
}
impl WinHandle {
fn new(handle: HANDLE) -> WinHandle {
WinHandle { handle }
}
fn invalid() -> WinHandle {
WinHandle {
handle: INVALID_HANDLE_VALUE,
}
}
fn is_valid(&self) -> bool {
self.handle != INVALID_HANDLE_VALUE
}
fn as_raw(&self) -> HANDLE {
self.handle
}
fn take_raw(&mut self) -> HANDLE {
mem::replace(&mut self.handle, INVALID_HANDLE_VALUE)
}
fn take(&mut self) -> WinHandle {
WinHandle::new(self.take_raw())
}
}
#[derive(Debug)]
struct AsyncData {
handle: WinHandle,
ov: NoDebug<Box<Overlapped>>,
buf: Vec<u8>,
}
#[repr(transparent)]
struct Overlapped(OVERLAPPED);
impl Drop for Overlapped {
fn drop(&mut self) {
unsafe {
if !self.0.hEvent.is_invalid() {
let result = CloseHandle(self.0.hEvent);
assert!(result.is_ok() || thread::panicking());
}
}
}
}
impl Overlapped {
fn new(ov: OVERLAPPED) -> Self {
Self(ov)
}
}
impl Deref for Overlapped {
type Target = OVERLAPPED;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Overlapped {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug)]
struct MessageReader {
handle: WinHandle,
read_buf: Vec<u8>,
r#async: Option<AliasedCell<AsyncData>>,
entry_id: Option<u64>,
}
unsafe impl Send for OsIpcReceiver {}
impl Drop for MessageReader {
fn drop(&mut self) {
self.cancel_io();
}
}
impl MessageReader {
fn new(handle: WinHandle) -> MessageReader {
MessageReader {
handle,
read_buf: Vec::new(),
r#async: None,
entry_id: None,
}
}
fn take(&mut self) -> MessageReader {
mem::replace(self, MessageReader::new(WinHandle::invalid()))
}
fn issue_async_cancel(&mut self) {
unsafe {
let result = CancelIoEx(
self.r#async.as_ref().unwrap().alias().handle.as_raw(),
self.r#async
.as_ref()
.map(|a| std::ptr::addr_of!(a.alias().ov.0.deref().0)),
);
if let Err(error) = result {
assert!(error.code() == ERROR_NOT_FOUND.to_hresult());
let async_data = self.r#async.take().unwrap().into_inner();
self.handle = async_data.handle;
self.read_buf = async_data.buf;
}
}
}
fn cancel_io(&mut self) {
if self.r#async.is_some() {
assert!(self.entry_id.is_none());
self.issue_async_cancel();
if self.r#async.is_some() {
let _ = self.fetch_async_result(BlockingMode::Blocking);
}
}
}
fn start_read(&mut self) -> Result<(), WinIpcError> {
if self.r#async.is_some() {
return Ok(());
}
win32_trace!("[$ {:?}] start_read", self.handle);
if self.read_buf.len() == self.read_buf.capacity() {
self.read_buf.reserve(PIPE_BUFFER_SIZE);
}
unsafe {
let buf_len = self.read_buf.len();
let buf_cap = self.read_buf.capacity();
self.read_buf.set_len(buf_cap);
self.r#async = Some(AliasedCell::new(AsyncData {
handle: self.handle.take(),
ov: NoDebug(Box::new({
let mut overlapped: OVERLAPPED = mem::zeroed();
overlapped.hEvent = CreateEventA(None, true, false, None)?;
Overlapped::new(overlapped)
})),
buf: std::mem::take(&mut self.read_buf),
}));
let result = {
let async_data = self.r#async.as_mut().unwrap().alias_mut();
let remaining_buf = &mut async_data.buf[buf_len..];
ReadFile(
async_data.handle.as_raw(),
Some(remaining_buf),
None,
Some(&mut ***async_data.ov.deref_mut()),
)
};
self.r#async
.as_mut()
.unwrap()
.alias_mut()
.buf
.set_len(buf_len);
match result.as_ref().map_err(|e| e.code()) {
Ok(_) => Ok(()),
Err(err) => {
if err == ERROR_IO_PENDING.to_hresult() {
Ok(())
} else if err == ERROR_BROKEN_PIPE.to_hresult() {
win32_trace!("[$ {:?}] BROKEN_PIPE straight from ReadFile", self.handle);
let async_data = self.r#async.take().unwrap().into_inner();
self.handle = async_data.handle;
self.read_buf = async_data.buf;
Err(WinIpcError::ChannelClosed)
} else {
let async_data = self.r#async.take().unwrap().into_inner();
self.handle = async_data.handle;
self.read_buf = async_data.buf;
result.map_err(|e| e.into())
}
},
}
}
}
unsafe fn notify_completion(
&mut self,
io_result: Result<(), WinError>,
) -> Result<(), WinIpcError> {
win32_trace!(
"[$ {:?}] notify_completion",
self.r#async.as_ref().unwrap().alias().handle
);
let async_data = self.r#async.take().unwrap().into_inner();
self.handle = async_data.handle;
let ov = async_data.ov;
self.read_buf = async_data.buf;
match io_result {
Ok(()) => {},
Err(err) => {
if err.code() == ERROR_BROKEN_PIPE.to_hresult() {
return Err(WinIpcError::ChannelClosed);
}
return Err(err.into());
},
}
let nbytes = ov.InternalHigh as u32;
let offset = ov.Anonymous.Anonymous.Offset;
assert!(offset == 0);
let new_size = self.read_buf.len() + nbytes as usize;
win32_trace!(
"nbytes: {}, offset {}, buf len {}->{}, capacity {}",
nbytes,
offset,
self.read_buf.len(),
new_size,
self.read_buf.capacity()
);
assert!(new_size <= self.read_buf.capacity());
self.read_buf.set_len(new_size);
Ok(())
}
fn fetch_async_result(&mut self, blocking_mode: BlockingMode) -> Result<(), WinIpcError> {
unsafe {
let mut nbytes: u32 = 0;
let timeout = match blocking_mode {
BlockingMode::Blocking => INFINITE,
BlockingMode::Nonblocking => 0,
BlockingMode::Timeout(duration) => {
duration.as_millis().try_into().unwrap_or(INFINITE)
},
};
let result = GetOverlappedResultEx(
self.r#async.as_ref().unwrap().alias().handle.as_raw(),
&***self.r#async.as_mut().unwrap().alias_mut().ov.deref(),
&mut nbytes,
timeout,
false,
);
let _ = ResetEvent(
self.r#async
.as_mut()
.unwrap()
.alias_mut()
.ov
.deref_mut()
.hEvent,
);
let io_result = if result.is_err() {
let err = GetLastError();
if blocking_mode != BlockingMode::Blocking && err == ERROR_IO_INCOMPLETE {
return Err(WinIpcError::NoData);
}
if err.0 == WAIT_TIMEOUT.0 {
self.cancel_io();
return Err(WinIpcError::NoData);
}
Err(WinError::new(err.to_hresult(), ""))
} else {
Ok(())
};
self.notify_completion(io_result)
}
}
fn get_message(&mut self) -> Result<Option<IpcMessage>, WinIpcError> {
if self.r#async.is_some() {
return Ok(None);
}
let drain_bytes;
let result;
if let Some(message) = Message::from_bytes(&self.read_buf) {
let mut channels: Vec<OsOpaqueIpcChannel> = vec![];
let mut shmems: Vec<OsIpcSharedMemory> = vec![];
let mut big_data = None;
if let Some(oob) = message.oob_data() {
win32_trace!("[$ {:?}] msg with total {} bytes, {} channels, {} shmems, big data handle {:?}",
self.handle, message.data_len, oob.channel_handles.len(), oob.shmem_handles.len(),
oob.big_data_receiver_handle);
for handle in oob.channel_handles {
channels.push(OsOpaqueIpcChannel::new(WinHandle::new(HANDLE(handle as _))));
}
for (handle, size) in oob.shmem_handles {
shmems.push(
OsIpcSharedMemory::from_handle(
WinHandle::new(HANDLE(handle as _)),
size as usize,
)
.unwrap(),
);
}
if oob.big_data_receiver_handle.is_some() {
let (handle, big_data_size) = oob.big_data_receiver_handle.unwrap();
let receiver = OsIpcReceiver::from_handle(WinHandle::new(HANDLE(handle as _)));
big_data = Some(receiver.recv_raw(big_data_size as usize)?);
}
}
let buf_data = big_data.unwrap_or_else(|| message.data().to_vec());
win32_trace!(
"[$ {:?}] get_message success -> {} bytes, {} channels, {} shmems",
self.handle,
buf_data.len(),
channels.len(),
shmems.len()
);
drain_bytes = Some(message.size());
result = Some(IpcMessage::new(buf_data, channels, shmems));
} else {
drain_bytes = None;
result = None;
}
if let Some(size) = drain_bytes {
if self.read_buf.len() == size {
self.read_buf.clear();
} else {
self.read_buf.drain(0..size);
}
}
Ok(result)
}
fn add_to_iocp(&mut self, iocp: &WinHandle, entry_id: u64) -> Result<(), WinIpcError> {
unsafe {
assert!(self.entry_id.is_none());
let completion_key = self.handle.as_raw().0;
CreateIoCompletionPort(
self.handle.as_raw(),
Some(iocp.as_raw()),
completion_key as usize,
0,
)?;
}
self.entry_id = Some(entry_id);
self.start_read()
}
fn read_raw_sized(mut self, size: usize) -> Result<Vec<u8>, WinIpcError> {
assert!(self.read_buf.is_empty());
self.read_buf.reserve(size);
while self.read_buf.len() < size {
match self.start_read() {
Err(WinIpcError::ChannelClosed) => {
return Err(WinError::new(ERROR_BROKEN_PIPE.to_hresult(), "ReadFile").into());
},
Err(err) => return Err(err),
Ok(()) => {},
};
match self.fetch_async_result(BlockingMode::Blocking) {
Err(WinIpcError::ChannelClosed) => {
return Err(WinError::new(ERROR_BROKEN_PIPE.to_hresult(), "ReadFile").into());
},
Err(err) => return Err(err),
Ok(()) => {},
};
}
Ok(std::mem::take(&mut self.read_buf))
}
fn get_raw_handle(&self) -> HANDLE {
self.handle.as_raw()
}
}
#[derive(Clone, Copy, Debug)]
enum AtomicMode {
Atomic,
Nonatomic,
}
fn write_buf(handle: &WinHandle, bytes: &[u8], atomic: AtomicMode) -> Result<(), WinError> {
let total = bytes.len();
if total == 0 {
return Ok(());
}
let mut written = 0;
while written < total {
let mut sz: u32 = 0;
let bytes_to_write = &bytes[written..];
unsafe { WriteFile(handle.as_raw(), Some(bytes_to_write), Some(&mut sz), None)? }
written += sz as usize;
match atomic {
AtomicMode::Atomic => {
if written != total {
panic!("Windows IPC write_buf expected to write full buffer, but only wrote partial (wrote {} out of {} bytes)", written, total);
}
},
AtomicMode::Nonatomic => {
win32_trace!(
"[c {:?}] ... wrote {} bytes, total {}/{} err {}",
handle.as_raw(),
sz,
written,
total,
WinError::from_win32()
);
},
}
}
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq)]
enum BlockingMode {
Blocking,
Nonblocking,
Timeout(Duration),
}
#[derive(Debug)]
pub struct OsIpcReceiver {
reader: RefCell<MessageReader>,
}
impl PartialEq for OsIpcReceiver {
fn eq(&self, other: &OsIpcReceiver) -> bool {
self.reader.borrow().handle == other.reader.borrow().handle
}
}
impl OsIpcReceiver {
fn from_handle(handle: WinHandle) -> OsIpcReceiver {
OsIpcReceiver {
reader: RefCell::new(MessageReader::new(handle)),
}
}
fn new_named(pipe_name: &CString) -> Result<OsIpcReceiver, WinError> {
unsafe {
let handle = create_duplex(pipe_name)?;
Ok(OsIpcReceiver {
reader: RefCell::new(MessageReader::new(WinHandle::new(handle))),
})
}
}
fn prepare_for_transfer(&self) -> Result<bool, WinError> {
let mut reader = self.reader.borrow_mut();
reader.cancel_io();
Ok(reader.read_buf.is_empty())
}
pub fn consume(&self) -> OsIpcReceiver {
let mut reader = self.reader.borrow_mut();
assert!(reader.r#async.is_none());
OsIpcReceiver::from_handle(reader.handle.take())
}
fn receive_message(&self, mut blocking_mode: BlockingMode) -> Result<IpcMessage, WinIpcError> {
let mut reader = self.reader.borrow_mut();
assert!(
reader.entry_id.is_none(),
"receive_message is only valid before this OsIpcReceiver was added to a Set"
);
loop {
if let Some(ipc_message) = reader.get_message()? {
return Ok(ipc_message);
}
reader.start_read()?;
reader.fetch_async_result(blocking_mode)?;
blocking_mode = BlockingMode::Blocking;
}
}
pub fn recv(&self) -> Result<IpcMessage, WinIpcError> {
win32_trace!("recv");
self.receive_message(BlockingMode::Blocking)
}
pub fn try_recv(&self) -> Result<IpcMessage, WinIpcError> {
win32_trace!("try_recv");
self.receive_message(BlockingMode::Nonblocking)
}
pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, WinIpcError> {
win32_trace!("try_recv_timeout");
self.receive_message(BlockingMode::Timeout(duration))
}
fn accept(&self) -> Result<(), WinError> {
unsafe {
let reader_borrow = self.reader.borrow();
let handle = &reader_borrow.handle;
let mut ov = AliasedCell::new(Box::new(mem::zeroed::<OVERLAPPED>()));
let result = ConnectNamedPipe(handle.as_raw(), Some(ov.alias_mut().deref_mut()));
assert!(result.is_err());
let result = match GetLastError() {
ERROR_PIPE_CONNECTED => {
win32_trace!("[$ {:?}] accept (PIPE_CONNECTED)", handle.as_raw());
Ok(())
},
ERROR_NO_DATA => {
win32_trace!("[$ {:?}] accept (ERROR_NO_DATA)", handle.as_raw());
Ok(())
},
ERROR_IO_PENDING => {
let mut nbytes: u32 = 0;
GetOverlappedResult(
handle.as_raw(),
ov.alias_mut().deref_mut(),
&mut nbytes,
true,
)
},
err => {
win32_trace!("[$ {:?}] accept error -> {:?}", handle.as_raw(), err);
Err(WinError::new(err.to_hresult(), "ConnectNamedPipe"))
},
};
ov.into_inner();
result
}
}
fn recv_raw(self, size: usize) -> Result<Vec<u8>, WinIpcError> {
self.reader.into_inner().read_raw_sized(size)
}
}
#[derive(Debug)]
pub struct OsIpcSender {
handle: WinHandle,
}
impl Clone for OsIpcSender {
fn clone(&self) -> OsIpcSender {
OsIpcSender::from_handle(dup_handle(&self.handle).unwrap())
}
}
impl OsIpcSender {
pub fn connect(name: String) -> Result<OsIpcSender, WinError> {
let pipe_name = make_pipe_name(&Uuid::parse_str(&name).unwrap());
OsIpcSender::connect_named(&pipe_name)
}
pub fn get_max_fragment_size() -> usize {
MAX_FRAGMENT_SIZE
}
fn from_handle(handle: WinHandle) -> OsIpcSender {
OsIpcSender { handle }
}
fn connect_named(pipe_name: &CString) -> Result<OsIpcSender, WinError> {
unsafe {
let handle = create_duplex(pipe_name)?;
win32_trace!("[c {:?}] connect_to_server success", handle);
Ok(OsIpcSender::from_handle(WinHandle::new(handle)))
}
}
fn get_pipe_server_process_id(&self) -> Result<u32, WinError> {
unsafe {
let mut server_pid = 0;
GetNamedPipeServerProcessId(self.handle.as_raw(), &mut server_pid).map(|()| server_pid)
}
}
fn get_pipe_server_process_handle_and_pid(&self) -> Result<(WinHandle, u32), WinError> {
unsafe {
let server_pid = self.get_pipe_server_process_id()?;
if server_pid == *CURRENT_PROCESS_ID {
return Ok((WinHandle::new(CURRENT_PROCESS_HANDLE.as_raw()), server_pid));
}
let raw_handle = OpenProcess(PROCESS_DUP_HANDLE, false, server_pid)?;
Ok((WinHandle::new(raw_handle), server_pid))
}
}
fn needs_fragmentation(data_len: usize, oob: &OutOfBandMessage) -> bool {
let oob_size = if oob.needs_to_be_sent() {
postcard::experimental::serialized_size(oob).unwrap()
} else {
0
};
assert!(
(oob_size as usize) <= (PIPE_BUFFER_SIZE - mem::size_of::<MessageHeader>()),
"too much oob data"
);
let bytes_left_for_data =
(PIPE_BUFFER_SIZE - mem::size_of::<MessageHeader>()) - (oob_size as usize);
data_len >= bytes_left_for_data
}
fn send_raw(&self, data: &[u8]) -> Result<(), WinError> {
win32_trace!(
"[c {:?}] writing {} bytes raw to (pid {}->{})",
self.handle.as_raw(),
data.len(),
*CURRENT_PROCESS_ID,
self.get_pipe_server_process_id()?
);
write_buf(&self.handle, data, AtomicMode::Nonatomic)
}
pub fn send(
&self,
data: &[u8],
ports: Vec<OsIpcChannel>,
shared_memory_regions: Vec<OsIpcSharedMemory>,
) -> Result<(), WinIpcError> {
assert!(data.len() <= u32::MAX as usize);
let (server_h, server_pid) = if !shared_memory_regions.is_empty() || !ports.is_empty() {
self.get_pipe_server_process_handle_and_pid()?
} else {
(WinHandle::invalid(), 0)
};
let mut oob = OutOfBandMessage::new(server_pid);
for ref shmem in shared_memory_regions {
let mut remote_handle = dup_handle_to_process(&shmem.handle, &server_h)?;
oob.shmem_handles
.push((remote_handle.take_raw().0 as _, shmem.length as u64));
}
for port in ports {
match port {
OsIpcChannel::Sender(s) => {
let mut raw_remote_handle = move_handle_to_process(s.handle, &server_h)?;
oob.channel_handles
.push(raw_remote_handle.take_raw().0 as _);
},
OsIpcChannel::Receiver(r) => {
if !(r.prepare_for_transfer()?) {
panic!("Sending receiver with outstanding partial read buffer, noooooo! What should even happen?");
}
let handle = r.reader.into_inner().handle.take();
let mut raw_remote_handle = move_handle_to_process(handle, &server_h)?;
oob.channel_handles
.push(raw_remote_handle.take_raw().0 as _);
},
}
}
let big_data_sender: Option<OsIpcSender> =
if OsIpcSender::needs_fragmentation(data.len(), &oob) {
let (sender, receiver) = channel()?;
let (server_h, server_pid) = if server_h.is_valid() {
(server_h, server_pid)
} else {
self.get_pipe_server_process_handle_and_pid()?
};
let handle = receiver.reader.into_inner().handle.take();
let mut raw_receiver_handle = move_handle_to_process(handle, &server_h)?;
oob.big_data_receiver_handle =
Some((raw_receiver_handle.take_raw().0 as _, data.len() as u64));
oob.target_process_id = server_pid;
Some(sender)
} else {
None
};
let mut oob_data: Vec<u8> = vec![];
if oob.needs_to_be_sent() {
oob_data = postcard::to_allocvec(&oob).unwrap();
}
let in_band_data_len = if big_data_sender.is_none() {
data.len()
} else {
0
};
let header = MessageHeader {
data_len: in_band_data_len as u32,
oob_len: oob_data.len() as u32,
};
let full_in_band_len = header.total_message_bytes_needed();
assert!(full_in_band_len <= PIPE_BUFFER_SIZE);
let mut full_message = Vec::<u8>::with_capacity(full_in_band_len);
{
let header_bytes = unsafe {
slice::from_raw_parts(&header as *const _ as *const u8, mem::size_of_val(&header))
};
full_message.extend_from_slice(header_bytes);
}
if big_data_sender.is_none() {
full_message.extend_from_slice(data);
full_message.extend_from_slice(&oob_data);
assert!(full_message.len() == full_in_band_len);
write_buf(&self.handle, &full_message, AtomicMode::Atomic)?;
} else {
full_message.extend_from_slice(&oob_data);
assert!(full_message.len() == full_in_band_len);
write_buf(&self.handle, &full_message, AtomicMode::Atomic)?;
big_data_sender.unwrap().send_raw(data)?;
}
Ok(())
}
}
pub enum OsIpcSelectionResult {
DataReceived(u64, IpcMessage),
ChannelClosed(u64),
}
pub struct OsIpcReceiverSet {
incrementor: RangeFrom<u64>,
iocp: WinHandle,
readers: Vec<MessageReader>,
closed_readers: Vec<u64>,
}
impl Drop for OsIpcReceiverSet {
fn drop(&mut self) {
for reader in &mut self.readers {
reader.issue_async_cancel();
}
self.readers.retain(|r| r.r#async.is_some());
while !self.readers.is_empty() {
let _ = self.fetch_iocp_result(INFINITE).unwrap();
}
}
}
impl OsIpcReceiverSet {
pub fn new() -> Result<OsIpcReceiverSet, WinError> {
unsafe {
let iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, None, 0, 0)?;
Ok(OsIpcReceiverSet {
incrementor: 0..,
iocp: WinHandle::new(iocp),
readers: vec![],
closed_readers: vec![],
})
}
}
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, WinIpcError> {
let mut reader = receiver.reader.into_inner();
let entry_id = self.incrementor.next().unwrap();
match reader.add_to_iocp(&self.iocp, entry_id) {
Ok(()) => {
win32_trace!(
"[# {:?}] ReceiverSet add {:?}, id {}",
self.iocp.as_raw(),
reader.get_raw_handle(),
entry_id
);
self.readers.push(reader);
},
Err(WinIpcError::ChannelClosed) => {
win32_trace!(
"[# {:?}] ReceiverSet add {:?} (closed), id {}",
self.iocp.as_raw(),
reader.get_raw_handle(),
entry_id
);
self.closed_readers.push(entry_id);
},
Err(err) => return Err(err),
};
Ok(entry_id)
}
fn fetch_iocp_result(
&mut self,
wait: u32,
) -> Result<(MessageReader, Result<(), WinIpcError>), WinError> {
unsafe {
let mut nbytes: u32 = 0;
let mut completion_key = 0;
let mut ov_ptr: *mut OVERLAPPED = ptr::null_mut();
let result = GetQueuedCompletionStatus(
self.iocp.as_raw(),
&mut nbytes,
&mut completion_key,
&mut ov_ptr,
wait,
);
win32_trace!(
"[# {:?}] GetQueuedCS -> ok:{} nbytes:{} key:{:?}",
self.iocp.as_raw(),
result.is_ok(),
nbytes,
completion_key
);
let io_result = if let Err(err) = result {
if ov_ptr.is_null() {
return Err(err);
}
Err(err)
} else {
Ok(())
};
assert!(!ov_ptr.is_null());
assert!(completion_key != 0);
let (reader_index, _) = self
.readers
.iter()
.enumerate()
.find(|(_, reader)| {
let raw_handle = reader.r#async.as_ref().unwrap().alias().handle.as_raw();
raw_handle.0 as usize == completion_key
})
.expect(
"Windows IPC ReceiverSet got notification for a receiver it doesn't know about",
);
let mut reader = self.readers.swap_remove(reader_index);
win32_trace!(
"[# {:?}] result for receiver {:?}",
self.iocp.as_raw(),
reader.get_raw_handle()
);
let result = reader.notify_completion(io_result);
Ok((reader, result))
}
}
pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, WinIpcError> {
assert!(
self.readers.len() + self.closed_readers.len() > 0,
"selecting with no objects?"
);
win32_trace!(
"[# {:?}] select() with {} active and {} closed receivers",
self.iocp.as_raw(),
self.readers.len(),
self.closed_readers.len()
);
let mut selection_results = vec![];
self.closed_readers.drain(..).for_each(|entry_id| {
selection_results.push(OsIpcSelectionResult::ChannelClosed(entry_id))
});
while selection_results.is_empty() {
let (mut reader, result) = self.fetch_iocp_result(INFINITE)?;
let mut closed = match result {
Ok(()) => false,
Err(WinIpcError::ChannelClosed) => true,
Err(err) => return Err(err),
};
if !closed {
while let Some(ipc_message) = reader.get_message()? {
win32_trace!(
"[# {:?}] receiver {:?} ({}) got a message",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
selection_results.push(OsIpcSelectionResult::DataReceived(
reader.entry_id.unwrap(),
ipc_message,
));
}
win32_trace!(
"[# {:?}] receiver {:?} ({}) -- no message",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
closed = match reader.start_read() {
Ok(()) => {
self.readers.push(reader.take());
false
},
Err(WinIpcError::ChannelClosed) => true,
Err(err) => return Err(err),
};
}
if closed {
win32_trace!(
"[# {:?}] receiver {:?} ({}) -- now closed!",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
selection_results.push(OsIpcSelectionResult::ChannelClosed(
reader.entry_id.unwrap(),
));
}
}
win32_trace!("select() -> {} results", selection_results.len());
Ok(selection_results)
}
pub fn try_select(&mut self) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
self.try_select_wait(0)
}
pub fn try_select_timeout(
&mut self,
duration: Duration,
) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
let mut remaining = duration;
loop {
let entry = std::time::Instant::now();
let v = self.try_select_wait(remaining.as_millis().try_into().unwrap_or(INFINITE));
match v {
Err(OsTrySelectError::Empty) => {
let rem = remaining.checked_sub(entry.elapsed());
if rem.is_none() {
return v;
}
remaining = rem.unwrap();
},
_ => return v,
}
}
}
fn try_select_wait(
&mut self,
wait: u32,
) -> Result<Vec<OsIpcSelectionResult>, OsTrySelectError> {
if self.readers.is_empty() && self.closed_readers.is_empty() {
thread::sleep(Duration::from_millis(wait as u64));
win32_trace!(
"[# {:?}] try_select_wait with 0 active and 0 closed receivers returning Empty",
self.iocp.as_raw()
);
return Err(OsTrySelectError::Empty);
}
win32_trace!(
"[# {:?}] try_select_wait() with {} active and {} closed receivers",
self.iocp.as_raw(),
self.readers.len(),
self.closed_readers.len()
);
let mut selection_results = vec![];
self.closed_readers.drain(..).for_each(|entry_id| {
selection_results.push(OsIpcSelectionResult::ChannelClosed(entry_id))
});
while selection_results.is_empty() {
let r = self.fetch_iocp_result(wait);
let (mut reader, result) = if let Err(winerr) = r {
if winerr.clone().code() == HRESULT::from_win32(WAIT_TIMEOUT.0) {
return Err(OsTrySelectError::Empty);
}
Err(OsTrySelectError::IoError(winerr.into()))?
} else {
r.unwrap()
};
let mut closed = match result {
Ok(()) => false,
Err(WinIpcError::ChannelClosed) => true,
Err(e) => return Err(e.into()),
};
if !closed {
while let Some(ipc_message) = reader.get_message()? {
win32_trace!(
"[# {:?}] receiver {:?} ({}) got a message",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
selection_results.push(OsIpcSelectionResult::DataReceived(
reader.entry_id.unwrap(),
ipc_message,
));
}
win32_trace!(
"[# {:?}] receiver {:?} ({}) -- no message",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
closed = match reader.start_read() {
Ok(()) => {
self.readers.push(reader.take());
false
},
Err(WinIpcError::ChannelClosed) => true,
Err(e) => return Err(e.into()),
};
}
if closed {
win32_trace!(
"[# {:?}] receiver {:?} ({}) -- now closed!",
self.iocp.as_raw(),
reader.get_raw_handle(),
reader.entry_id.unwrap()
);
selection_results.push(OsIpcSelectionResult::ChannelClosed(
reader.entry_id.unwrap(),
));
}
}
if selection_results.is_empty() {
win32_trace!("try_select_wait() -> Empty");
Err(OsTrySelectError::Empty)
} else {
win32_trace!("try_select_wait() -> {} results", selection_results.len());
Ok(selection_results)
}
}
}
impl OsIpcSelectionResult {
pub fn unwrap(self) -> (u64, IpcMessage) {
match self {
OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
OsIpcSelectionResult::ChannelClosed(id) => {
panic!(
"OsIpcSelectionResult::unwrap(): receiver ID {} was closed!",
id
)
},
}
}
}
#[derive(Debug)]
pub struct OsIpcSharedMemory {
handle: WinHandle,
view_handle: MEMORY_MAPPED_VIEW_ADDRESS,
length: usize,
}
unsafe impl Send for OsIpcSharedMemory {}
unsafe impl Sync for OsIpcSharedMemory {}
impl Drop for OsIpcSharedMemory {
fn drop(&mut self) {
unsafe {
let result = UnmapViewOfFile(self.view_handle);
assert!(result.is_ok() || thread::panicking());
}
}
}
impl Clone for OsIpcSharedMemory {
fn clone(&self) -> OsIpcSharedMemory {
OsIpcSharedMemory::from_handle(dup_handle(&self.handle).unwrap(), self.length).unwrap()
}
}
impl PartialEq for OsIpcSharedMemory {
fn eq(&self, other: &OsIpcSharedMemory) -> bool {
self.handle == other.handle
}
}
impl Deref for OsIpcSharedMemory {
type Target = [u8];
#[inline]
fn deref(&self) -> &[u8] {
assert!(!self.view_handle.Value.is_null() && self.handle.is_valid());
unsafe { slice::from_raw_parts(self.view_handle.Value as _, self.length) }
}
}
impl OsIpcSharedMemory {
#[inline]
pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
assert!(!self.view_handle.Value.is_null() && self.handle.is_valid());
unsafe { slice::from_raw_parts_mut(self.view_handle.Value as _, self.length) }
}
pub fn take(self) -> Option<Vec<u8>> {
Some((*self).to_vec())
}
}
impl OsIpcSharedMemory {
fn new(length: usize) -> Result<OsIpcSharedMemory, WinError> {
unsafe {
assert!(length < u32::MAX as usize);
let (lhigh, llow) = (length.checked_shr(32).unwrap_or(0) as u32, length as u32);
let handle = CreateFileMappingA(
INVALID_HANDLE_VALUE,
None,
PAGE_READWRITE | SEC_COMMIT,
lhigh,
llow,
None,
)?;
OsIpcSharedMemory::from_handle(WinHandle::new(handle), length)
}
}
fn from_handle(handle: WinHandle, length: usize) -> Result<OsIpcSharedMemory, WinError> {
unsafe {
let address = MapViewOfFile(handle.as_raw(), FILE_MAP_ALL_ACCESS, 0, 0, 0);
if address.Value.is_null() {
return Err(WinError::from_win32());
}
Ok(OsIpcSharedMemory {
handle,
view_handle: address,
length,
})
}
}
pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
unsafe {
let mem = OsIpcSharedMemory::new(length).unwrap();
for element in slice::from_raw_parts_mut(mem.view_handle.Value as _, mem.length) {
*element = byte;
}
mem
}
}
pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
unsafe {
let mem = OsIpcSharedMemory::new(bytes.len()).unwrap();
ptr::copy_nonoverlapping(bytes.as_ptr(), mem.view_handle.Value as _, bytes.len());
mem
}
}
}
pub struct OsIpcOneShotServer {
receiver: OsIpcReceiver,
}
impl OsIpcOneShotServer {
pub fn new() -> Result<(OsIpcOneShotServer, String), WinError> {
let pipe_id = make_pipe_id();
let pipe_name = make_pipe_name(&pipe_id);
let receiver = OsIpcReceiver::new_named(&pipe_name)?;
Ok((OsIpcOneShotServer { receiver }, pipe_id.to_string()))
}
pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), WinIpcError> {
let receiver = self.receiver;
receiver.accept()?;
let ipc_message = receiver.recv()?;
Ok((receiver, ipc_message))
}
}
pub enum OsIpcChannel {
Sender(OsIpcSender),
Receiver(OsIpcReceiver),
}
#[derive(Debug)]
pub struct OsOpaqueIpcChannel {
handle: WinHandle,
}
impl Drop for OsOpaqueIpcChannel {
fn drop(&mut self) {
debug_assert!(!self.handle.is_valid());
}
}
impl OsOpaqueIpcChannel {
fn new(handle: WinHandle) -> OsOpaqueIpcChannel {
OsOpaqueIpcChannel { handle }
}
pub fn to_receiver(&mut self) -> OsIpcReceiver {
OsIpcReceiver::from_handle(self.handle.take())
}
pub fn to_sender(&mut self) -> OsIpcSender {
OsIpcSender::from_handle(self.handle.take())
}
}
#[derive(Debug, Error)]
pub enum WinIpcError {
#[error("Windows Error {0}.")]
WinError(#[from] WinError),
#[error("Channel Closed.")]
ChannelClosed,
#[error("No Data.")]
NoData,
}
impl WinIpcError {
pub fn channel_is_closed(&self) -> bool {
matches!(self, Self::ChannelClosed)
}
}
impl From<WinIpcError> for crate::IpcError {
fn from(error: WinIpcError) -> Self {
match error {
WinIpcError::ChannelClosed => crate::IpcError::Disconnected,
e => crate::IpcError::Io(io::Error::from(e)),
}
}
}
impl From<WinIpcError> for crate::TryRecvError {
fn from(error: WinIpcError) -> Self {
match error {
WinIpcError::ChannelClosed => {
crate::TryRecvError::IpcError(crate::IpcError::Disconnected)
},
WinIpcError::NoData => crate::TryRecvError::Empty,
e => crate::TryRecvError::IpcError(crate::IpcError::Io(io::Error::from(e))),
}
}
}
impl From<WinIpcError> for io::Error {
fn from(error: WinIpcError) -> io::Error {
match error {
WinIpcError::ChannelClosed => {
io::Error::from_raw_os_error(ERROR_BROKEN_PIPE.0 as i32)
},
WinIpcError::NoData => io::Error::new(
io::ErrorKind::WouldBlock,
"Win channel has no data available",
),
WinIpcError::WinError(err) => io::Error::from_raw_os_error(err.code().0),
}
}
}