#[cfg(unix)]
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::RawHandle;
use std::time::Duration;
#[cfg(windows)]
use windows_sys::Win32::Foundation::{
CloseHandle, HANDLE, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT,
};
#[cfg(windows)]
use windows_sys::Win32::System::Threading::{
CreateEventW, ResetEvent, SetEvent, WaitForSingleObject,
};
#[cfg(unix)]
pub struct WakePipe {
read_fd: OwnedFd,
write_fd: OwnedFd,
}
#[cfg(windows)]
pub struct WakePipe {
handle: HANDLE,
}
impl WakePipe {
#[cfg(unix)]
pub fn new() -> Self {
let mut fds = [0i32; 2];
let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
assert!(
ret == 0,
"pipe() failed: {}",
std::io::Error::last_os_error()
);
unsafe {
set_nonblock_cloexec(fds[0]);
set_nonblock_cloexec(fds[1]);
}
Self {
read_fd: unsafe { OwnedFd::from_raw_fd(fds[0]) },
write_fd: unsafe { OwnedFd::from_raw_fd(fds[1]) },
}
}
#[cfg(windows)]
pub fn new() -> Self {
let handle = unsafe { CreateEventW(std::ptr::null(), 1, 0, std::ptr::null()) };
if handle.is_null() {
panic!("CreateEventW failed: {}", std::io::Error::last_os_error());
}
Self { handle }
}
#[cfg(unix)]
pub fn wake(&self) {
unsafe {
libc::write(self.write_fd.as_raw_fd(), [1u8].as_ptr().cast(), 1);
}
}
#[cfg(windows)]
pub fn wake(&self) {
unsafe {
SetEvent(self.handle);
}
}
#[cfg(unix)]
pub fn drain(&self) {
let mut buf = [0u8; 512];
loop {
let n =
unsafe { libc::read(self.read_fd.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) };
if n <= 0 {
break;
}
}
}
#[cfg(windows)]
pub fn drain(&self) {
unsafe {
ResetEvent(self.handle);
}
}
pub fn wait_timeout(&self, timeout: Duration) -> bool {
wait_timeout(self, timeout)
}
#[cfg(unix)]
pub fn as_raw_fd(&self) -> RawFd {
self.read_fd.as_raw_fd()
}
#[cfg(windows)]
pub fn as_raw_handle(&self) -> RawHandle {
self.handle as RawHandle
}
}
impl Default for WakePipe {
fn default() -> Self {
Self::new()
}
}
#[cfg(windows)]
unsafe impl Send for WakePipe {}
#[cfg(windows)]
unsafe impl Sync for WakePipe {}
#[cfg(windows)]
impl Drop for WakePipe {
fn drop(&mut self) {
unsafe {
CloseHandle(self.handle);
}
}
}
#[cfg(unix)]
unsafe fn set_nonblock_cloexec(fd: RawFd) {
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL);
assert!(
flags >= 0,
"fcntl(F_GETFL) failed: {}",
std::io::Error::last_os_error()
);
let ret = libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
assert!(
ret >= 0,
"fcntl(F_SETFL) failed: {}",
std::io::Error::last_os_error()
);
let flags = libc::fcntl(fd, libc::F_GETFD);
assert!(
flags >= 0,
"fcntl(F_GETFD) failed: {}",
std::io::Error::last_os_error()
);
let ret = libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
assert!(
ret >= 0,
"fcntl(F_SETFD) failed: {}",
std::io::Error::last_os_error()
);
}
}
#[cfg(unix)]
fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;
poll_fd_readable_timeout(pipe.as_raw_fd(), timeout_ms)
}
#[cfg(unix)]
fn poll_fd_readable_timeout(fd: RawFd, timeout_ms: i32) -> bool {
loop {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
if ret > 0 {
return true;
}
if ret == 0 {
return false;
}
let errno = std::io::Error::last_os_error();
if errno.raw_os_error() != Some(libc::EINTR) {
return false;
}
}
}
#[cfg(windows)]
fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
let timeout_ms = timeout.as_millis().min(u32::MAX as u128) as u32;
let result = unsafe { WaitForSingleObject(pipe.handle, timeout_ms) };
match result {
WAIT_OBJECT_0 => true,
WAIT_TIMEOUT | WAIT_FAILED => false,
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn wake_and_drain() {
let pipe = WakePipe::new();
pipe.drain();
pipe.wake();
pipe.wake();
pipe.drain();
pipe.wake();
pipe.drain();
}
#[cfg(unix)]
#[test]
fn fd_is_valid() {
let pipe = WakePipe::new();
let fd = pipe.as_raw_fd();
assert!(fd >= 0);
}
#[test]
fn nonblocking_read() {
let pipe = WakePipe::new();
pipe.drain();
}
#[test]
fn wait_timeout_observes_wake() {
let pipe = WakePipe::new();
assert!(!pipe.wait_timeout(Duration::from_millis(1)));
pipe.wake();
assert!(pipe.wait_timeout(Duration::from_secs(1)));
pipe.drain();
assert!(!pipe.wait_timeout(Duration::from_millis(1)));
}
}