microsandbox_utils/
wake_pipe.rs1#[cfg(unix)]
7use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
8#[cfg(windows)]
9use std::os::windows::io::RawHandle;
10use std::time::Duration;
11#[cfg(windows)]
12use windows_sys::Win32::Foundation::{
13 CloseHandle, HANDLE, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT,
14};
15#[cfg(windows)]
16use windows_sys::Win32::System::Threading::{
17 CreateEventW, ResetEvent, SetEvent, WaitForSingleObject,
18};
19
20#[cfg(unix)]
30pub struct WakePipe {
31 read_fd: OwnedFd,
32 write_fd: OwnedFd,
33}
34
35#[cfg(windows)]
42pub struct WakePipe {
43 handle: HANDLE,
44}
45
46impl WakePipe {
51 #[cfg(unix)]
55 pub fn new() -> Self {
56 let mut fds = [0i32; 2];
57
58 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
61 assert!(
62 ret == 0,
63 "pipe() failed: {}",
64 std::io::Error::last_os_error()
65 );
66
67 unsafe {
70 set_nonblock_cloexec(fds[0]);
71 set_nonblock_cloexec(fds[1]);
72 }
73
74 Self {
75 read_fd: unsafe { OwnedFd::from_raw_fd(fds[0]) },
77 write_fd: unsafe { OwnedFd::from_raw_fd(fds[1]) },
78 }
79 }
80
81 #[cfg(windows)]
83 pub fn new() -> Self {
84 let handle = unsafe { CreateEventW(std::ptr::null(), 1, 0, std::ptr::null()) };
85 if handle.is_null() {
86 panic!("CreateEventW failed: {}", std::io::Error::last_os_error());
87 }
88 Self { handle }
89 }
90
91 #[cfg(unix)]
96 pub fn wake(&self) {
97 unsafe {
100 libc::write(self.write_fd.as_raw_fd(), [1u8].as_ptr().cast(), 1);
101 }
102 }
103
104 #[cfg(windows)]
106 pub fn wake(&self) {
107 unsafe {
108 SetEvent(self.handle);
109 }
110 }
111
112 #[cfg(unix)]
115 pub fn drain(&self) {
116 let mut buf = [0u8; 512];
117 loop {
118 let n =
120 unsafe { libc::read(self.read_fd.as_raw_fd(), buf.as_mut_ptr().cast(), buf.len()) };
121 if n <= 0 {
122 break;
123 }
124 }
125 }
126
127 #[cfg(windows)]
129 pub fn drain(&self) {
130 unsafe {
131 ResetEvent(self.handle);
132 }
133 }
134
135 pub fn wait_timeout(&self, timeout: Duration) -> bool {
139 wait_timeout(self, timeout)
140 }
141
142 #[cfg(unix)]
146 pub fn as_raw_fd(&self) -> RawFd {
147 self.read_fd.as_raw_fd()
148 }
149
150 #[cfg(windows)]
152 pub fn as_raw_handle(&self) -> RawHandle {
153 self.handle as RawHandle
154 }
155}
156
157impl Default for WakePipe {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167#[cfg(windows)]
168unsafe impl Send for WakePipe {}
169
170#[cfg(windows)]
171unsafe impl Sync for WakePipe {}
172
173#[cfg(windows)]
174impl Drop for WakePipe {
175 fn drop(&mut self) {
176 unsafe {
177 CloseHandle(self.handle);
178 }
179 }
180}
181
182#[cfg(unix)]
192unsafe fn set_nonblock_cloexec(fd: RawFd) {
193 unsafe {
194 let flags = libc::fcntl(fd, libc::F_GETFL);
196 assert!(
197 flags >= 0,
198 "fcntl(F_GETFL) failed: {}",
199 std::io::Error::last_os_error()
200 );
201 let ret = libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
202 assert!(
203 ret >= 0,
204 "fcntl(F_SETFL) failed: {}",
205 std::io::Error::last_os_error()
206 );
207
208 let flags = libc::fcntl(fd, libc::F_GETFD);
210 assert!(
211 flags >= 0,
212 "fcntl(F_GETFD) failed: {}",
213 std::io::Error::last_os_error()
214 );
215 let ret = libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
216 assert!(
217 ret >= 0,
218 "fcntl(F_SETFD) failed: {}",
219 std::io::Error::last_os_error()
220 );
221 }
222}
223
224#[cfg(unix)]
225fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
226 let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32;
227 poll_fd_readable_timeout(pipe.as_raw_fd(), timeout_ms)
228}
229
230#[cfg(unix)]
231fn poll_fd_readable_timeout(fd: RawFd, timeout_ms: i32) -> bool {
232 loop {
233 let mut pfd = libc::pollfd {
234 fd,
235 events: libc::POLLIN,
236 revents: 0,
237 };
238 let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
240 if ret > 0 {
241 return true;
242 }
243 if ret == 0 {
244 return false;
245 }
246
247 let errno = std::io::Error::last_os_error();
248 if errno.raw_os_error() != Some(libc::EINTR) {
249 return false;
250 }
251 }
252}
253
254#[cfg(windows)]
255fn wait_timeout(pipe: &WakePipe, timeout: Duration) -> bool {
256 let timeout_ms = timeout.as_millis().min(u32::MAX as u128) as u32;
257 let result = unsafe { WaitForSingleObject(pipe.handle, timeout_ms) };
258 match result {
259 WAIT_OBJECT_0 => true,
260 WAIT_TIMEOUT | WAIT_FAILED => false,
261 _ => false,
262 }
263}
264
265#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn wake_and_drain() {
275 let pipe = WakePipe::new();
276 pipe.drain();
278
279 pipe.wake();
281 pipe.wake();
282 pipe.drain();
283
284 pipe.wake();
286 pipe.drain();
287 }
288
289 #[cfg(unix)]
290 #[test]
291 fn fd_is_valid() {
292 let pipe = WakePipe::new();
293 let fd = pipe.as_raw_fd();
294 assert!(fd >= 0);
295 }
296
297 #[test]
298 fn nonblocking_read() {
299 let pipe = WakePipe::new();
300 pipe.drain();
302 }
303
304 #[test]
305 fn wait_timeout_observes_wake() {
306 let pipe = WakePipe::new();
307
308 assert!(!pipe.wait_timeout(Duration::from_millis(1)));
309 pipe.wake();
310 assert!(pipe.wait_timeout(Duration::from_secs(1)));
311 pipe.drain();
312 assert!(!pipe.wait_timeout(Duration::from_millis(1)));
313 }
314}