Skip to main content

microsandbox_utils/
wake_pipe.rs

1//! Cross-platform wake notification.
2//!
3//! Works on both Linux and macOS (unlike `eventfd` which is Linux-only).
4//! The write end signals, the read end is pollable via `epoll`/`kqueue`/`poll`.
5
6#[cfg(unix)]
7use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
8#[cfg(windows)]
9use std::os::windows::io::RawHandle;
10use std::time::Duration;
11#[cfg(windows)]
12use windows_sys::Win32::Foundation::{
13    CloseHandle, HANDLE, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT,
14};
15#[cfg(windows)]
16use windows_sys::Win32::System::Threading::{
17    CreateEventW, ResetEvent, SetEvent, WaitForSingleObject,
18};
19
20//--------------------------------------------------------------------------------------------------
21// Types
22//--------------------------------------------------------------------------------------------------
23
24/// Cross-platform wake notification.
25///
26/// On Unix, the write end signals and the read end is pollable via
27/// `epoll`/`kqueue`/`poll`. On Windows, wakeups are coalesced behind a
28/// manual-reset event for code that needs a blocking wait.
29#[cfg(unix)]
30pub struct WakePipe {
31    read_fd: OwnedFd,
32    write_fd: OwnedFd,
33}
34
35/// Cross-platform wake notification.
36///
37/// Windows wakeups are coalesced behind a manual-reset event. Multiple
38/// [`wake`](Self::wake) calls before [`drain`](Self::drain) still represent one
39/// readable/ready state, matching the Unix pipe behavior needed by queue
40/// wakeups.
41#[cfg(windows)]
42pub struct WakePipe {
43    handle: HANDLE,
44}
45
46//--------------------------------------------------------------------------------------------------
47// Methods
48//--------------------------------------------------------------------------------------------------
49
50impl WakePipe {
51    /// Create a new wake pipe.
52    ///
53    /// Both ends are set to non-blocking and close-on-exec.
54    #[cfg(unix)]
55    pub fn new() -> Self {
56        let mut fds = [0i32; 2];
57
58        // SAFETY: pipe() is a standard POSIX call. We check the return value
59        // and immediately wrap the raw fds in OwnedFd for RAII cleanup.
60        let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
61        assert!(
62            ret == 0,
63            "pipe() failed: {}",
64            std::io::Error::last_os_error()
65        );
66
67        // Set non-blocking and close-on-exec on both ends.
68        // SAFETY: fds are valid open file descriptors from the pipe() call above.
69        unsafe {
70            set_nonblock_cloexec(fds[0]);
71            set_nonblock_cloexec(fds[1]);
72        }
73
74        Self {
75            // SAFETY: fds are valid and not owned by anything else yet.
76            read_fd: unsafe { OwnedFd::from_raw_fd(fds[0]) },
77            write_fd: unsafe { OwnedFd::from_raw_fd(fds[1]) },
78        }
79    }
80
81    /// Create a new wake primitive.
82    #[cfg(windows)]
83    pub fn new() -> Self {
84        let handle = unsafe { CreateEventW(std::ptr::null(), 1, 0, std::ptr::null()) };
85        if handle.is_null() {
86            panic!("CreateEventW failed: {}", std::io::Error::last_os_error());
87        }
88        Self { handle }
89    }
90
91    /// Signal the reader. Safe to call from any thread, multiple times.
92    ///
93    /// Writes a single byte. If the pipe buffer is full the write is silently
94    /// dropped — the reader will still wake because there are unread bytes.
95    #[cfg(unix)]
96    pub fn wake(&self) {
97        // SAFETY: write_fd is a valid, non-blocking file descriptor.
98        // Writing 1 byte to a pipe is atomic on all POSIX systems.
99        unsafe {
100            libc::write(self.write_fd.as_raw_fd(), [1u8].as_ptr().cast(), 1);
101        }
102    }
103
104    /// Signal the reader. Safe to call from any thread, multiple times.
105    #[cfg(windows)]
106    pub fn wake(&self) {
107        unsafe {
108            SetEvent(self.handle);
109        }
110    }
111
112    /// Drain all pending wake signals. Call after processing to reset the
113    /// pipe for the next edge-triggered notification.
114    #[cfg(unix)]
115    pub fn drain(&self) {
116        let mut buf = [0u8; 512];
117        loop {
118            // SAFETY: read_fd is a valid, non-blocking file descriptor.
119            let n =
120                unsafe { libc::read(self.read_fd.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) };
121            if n <= 0 {
122                break;
123            }
124        }
125    }
126
127    /// Drain all pending wake signals.
128    #[cfg(windows)]
129    pub fn drain(&self) {
130        unsafe {
131            ResetEvent(self.handle);
132        }
133    }
134
135    /// Block until the wake primitive is signaled or the timeout expires.
136    ///
137    /// Returns `true` if the primitive was signaled and `false` on timeout.
138    pub fn wait_timeout(&self, timeout: Duration) -> bool {
139        wait_timeout(self, timeout)
140    }
141
142    /// File descriptor for `epoll`/`kqueue`/`poll(2)` registration.
143    ///
144    /// Becomes readable when [`wake()`](Self::wake) has been called.
145    #[cfg(unix)]
146    pub fn as_raw_fd(&self) -> RawFd {
147        self.read_fd.as_raw_fd()
148    }
149
150    /// Waitable handle for Windows wait APIs.
151    #[cfg(windows)]
152    pub fn as_raw_handle(&self) -> RawHandle {
153        self.handle as RawHandle
154    }
155}
156
157//--------------------------------------------------------------------------------------------------
158// Trait Implementations
159//--------------------------------------------------------------------------------------------------
160
161impl Default for WakePipe {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167#[cfg(windows)]
168unsafe impl Send for WakePipe {}
169
170#[cfg(windows)]
171unsafe impl Sync for WakePipe {}
172
173#[cfg(windows)]
174impl Drop for WakePipe {
175    fn drop(&mut self) {
176        unsafe {
177            CloseHandle(self.handle);
178        }
179    }
180}
181
182//--------------------------------------------------------------------------------------------------
183// Functions
184//--------------------------------------------------------------------------------------------------
185
186/// Set `O_NONBLOCK` and `FD_CLOEXEC` on a file descriptor.
187///
188/// # Safety
189///
190/// `fd` must be a valid, open file descriptor.
191#[cfg(unix)]
192unsafe fn set_nonblock_cloexec(fd: RawFd) {
193    unsafe {
194        // Set non-blocking.
195        let flags = libc::fcntl(fd, libc::F_GETFL);
196        assert!(
197            flags >= 0,
198            "fcntl(F_GETFL) failed: {}",
199            std::io::Error::last_os_error()
200        );
201        let ret = libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
202        assert!(
203            ret >= 0,
204            "fcntl(F_SETFL) failed: {}",
205            std::io::Error::last_os_error()
206        );
207
208        // Set close-on-exec.
209        let flags = libc::fcntl(fd, libc::F_GETFD);
210        assert!(
211            flags >= 0,
212            "fcntl(F_GETFD) failed: {}",
213            std::io::Error::last_os_error()
214        );
215        let ret = libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
216        assert!(
217            ret >= 0,
218            "fcntl(F_SETFD) failed: {}",
219            std::io::Error::last_os_error()
220        );
221    }
222}
223
224#[cfg(unix)]
225fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
226    let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;
227    poll_fd_readable_timeout(pipe.as_raw_fd(), timeout_ms)
228}
229
230#[cfg(unix)]
231fn poll_fd_readable_timeout(fd: RawFd, timeout_ms: i32) -> bool {
232    loop {
233        let mut pfd = libc::pollfd {
234            fd,
235            events: libc::POLLIN,
236            revents: 0,
237        };
238        // SAFETY: pfd is a valid stack-allocated pollfd.
239        let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
240        if ret > 0 {
241            return true;
242        }
243        if ret == 0 {
244            return false;
245        }
246
247        let errno = std::io::Error::last_os_error();
248        if errno.raw_os_error() != Some(libc::EINTR) {
249            return false;
250        }
251    }
252}
253
254#[cfg(windows)]
255fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
256    let timeout_ms = timeout.as_millis().min(u32::MAX as u128) as u32;
257    let result = unsafe { WaitForSingleObject(pipe.handle, timeout_ms) };
258    match result {
259        WAIT_OBJECT_0 => true,
260        WAIT_TIMEOUT | WAIT_FAILED => false,
261        _ => false,
262    }
263}
264
265//--------------------------------------------------------------------------------------------------
266// Tests
267//--------------------------------------------------------------------------------------------------
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn wake_and_drain() {
275        let pipe = WakePipe::new();
276        // Initially no data — drain is a no-op.
277        pipe.drain();
278
279        // Wake then drain.
280        pipe.wake();
281        pipe.wake();
282        pipe.drain();
283
284        // After drain, another wake should work.
285        pipe.wake();
286        pipe.drain();
287    }
288
289    #[cfg(unix)]
290    #[test]
291    fn fd_is_valid() {
292        let pipe = WakePipe::new();
293        let fd = pipe.as_raw_fd();
294        assert!(fd >= 0);
295    }
296
297    #[test]
298    fn nonblocking_read() {
299        let pipe = WakePipe::new();
300        // Reading from an empty non-blocking pipe should not block.
301        pipe.drain();
302    }
303
304    #[test]
305    fn wait_timeout_observes_wake() {
306        let pipe = WakePipe::new();
307
308        assert!(!pipe.wait_timeout(Duration::from_millis(1)));
309        pipe.wake();
310        assert!(pipe.wait_timeout(Duration::from_secs(1)));
311        pipe.drain();
312        assert!(!pipe.wait_timeout(Duration::from_millis(1)));
313    }
314}