1use std::io;
2use std::time::Duration;
3
4#[cfg(unix)]
5use std::os::unix::io::RawFd;
6
7#[cfg(unix)]
12pub struct EventPair {
13 daw_to_host: [RawFd; 2],
14 host_to_daw: [RawFd; 2],
15}
16
17#[cfg(windows)]
18pub struct EventPair {
19 daw_to_host: *mut std::ffi::c_void,
20 host_to_daw: *mut std::ffi::c_void,
21 daw_to_host_name: String,
22 host_to_daw_name: String,
23}
24
25unsafe impl Send for EventPair {}
26unsafe impl Sync for EventPair {}
27
28#[cfg(unix)]
29impl EventPair {
30 pub fn new() -> io::Result<Self> {
32 let mut daw_to_host = [0; 2];
33 let mut host_to_daw = [0; 2];
34 if unsafe { libc::pipe(daw_to_host.as_mut_ptr()) } != 0 {
35 return Err(io::Error::last_os_error());
36 }
37 if unsafe { libc::pipe(host_to_daw.as_mut_ptr()) } != 0 {
38 unsafe {
39 libc::close(daw_to_host[0]);
40 libc::close(daw_to_host[1]);
41 }
42 return Err(io::Error::last_os_error());
43 }
44 Ok(Self {
45 daw_to_host,
46 host_to_daw,
47 })
48 }
49
50 pub unsafe fn from_fds(daw_to_host_read: RawFd, host_to_daw_write: RawFd) -> Self {
54 let mut pair = Self {
55 daw_to_host: [daw_to_host_read, -1],
56 host_to_daw: [-1, host_to_daw_write],
57 };
58 pair.close_host_unused();
59 pair
60 }
61
62 pub fn daw_write_fd(&self) -> RawFd {
63 self.daw_to_host[1]
64 }
65
66 pub fn daw_read_fd(&self) -> RawFd {
67 self.host_to_daw[0]
68 }
69
70 pub fn host_read_fd(&self) -> RawFd {
71 self.daw_to_host[0]
72 }
73
74 pub fn host_write_fd(&self) -> RawFd {
75 self.host_to_daw[1]
76 }
77
78 pub fn signal_host(&self) -> io::Result<()> {
80 write_byte(self.daw_to_host[1])
81 }
82
83 pub fn wait_host(&self, timeout: Duration) -> io::Result<()> {
85 read_byte(self.host_to_daw[0], timeout)
86 }
87
88 pub fn wait_daw(&self, timeout: Duration) -> io::Result<()> {
90 read_byte(self.daw_to_host[0], timeout)
91 }
92
93 pub fn signal_daw(&self) -> io::Result<()> {
95 write_byte(self.host_to_daw[1])
96 }
97
98 pub fn close_daw_unused(&mut self) {
100 unsafe {
101 libc::close(self.daw_to_host[0]);
102 libc::close(self.host_to_daw[1]);
103 }
104 self.daw_to_host[0] = -1;
105 self.host_to_daw[1] = -1;
106 }
107
108 pub fn close_host_unused(&mut self) {
110 unsafe {
111 libc::close(self.daw_to_host[1]);
112 libc::close(self.host_to_daw[0]);
113 }
114 self.daw_to_host[1] = -1;
115 self.host_to_daw[0] = -1;
116 }
117}
118
119#[cfg(unix)]
120impl Drop for EventPair {
121 fn drop(&mut self) {
122 unsafe {
123 libc::close(self.daw_to_host[0]);
124 libc::close(self.daw_to_host[1]);
125 libc::close(self.host_to_daw[0]);
126 libc::close(self.host_to_daw[1]);
127 }
128 }
129}
130
131#[cfg(unix)]
132fn write_byte(fd: RawFd) -> io::Result<()> {
133 let buf = [1u8];
134 let n = unsafe { libc::write(fd, buf.as_ptr().cast(), 1) };
135 if n < 0 {
136 Err(io::Error::last_os_error())
137 } else {
138 Ok(())
139 }
140}
141
142#[cfg(unix)]
143fn read_byte(fd: RawFd, timeout: Duration) -> io::Result<()> {
144 let mut pfd = libc::pollfd {
145 fd,
146 events: libc::POLLIN,
147 revents: 0,
148 };
149 let ms = timeout.as_millis().clamp(0, i32::MAX as u128) as i32;
150 let rc = unsafe { libc::poll(&mut pfd, 1, ms) };
151 if rc < 0 {
152 return Err(io::Error::last_os_error());
153 }
154 if rc == 0 {
155 return Err(io::Error::new(io::ErrorKind::TimedOut, "poll timeout"));
156 }
157 let mut buf = [0u8; 1];
158 let n = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 1) };
159 if n < 0 {
160 Err(io::Error::last_os_error())
161 } else if n == 0 {
162 Err(io::Error::new(
163 io::ErrorKind::UnexpectedEof,
164 "event peer closed the pipe",
165 ))
166 } else {
167 Ok(())
168 }
169}
170
171#[cfg(windows)]
172impl EventPair {
173 pub fn new() -> io::Result<Self> {
175 use windows_sys::Win32::Foundation::GetLastError;
176 use windows_sys::Win32::System::Threading::CreateEventW;
177
178 let pid = std::process::id();
179 let nonce = std::time::SystemTime::now()
180 .duration_since(std::time::UNIX_EPOCH)
181 .unwrap_or_default()
182 .as_nanos();
183 let daw_to_host_name = format!("Local\\maolan-d2h-{}-{}", pid, nonce);
184 let host_to_daw_name = format!("Local\\maolan-h2d-{}-{}", pid, nonce);
185
186 let d2h_wide: Vec<u16> = daw_to_host_name
187 .encode_utf16()
188 .chain(std::iter::once(0))
189 .collect();
190 let h2d_wide: Vec<u16> = host_to_daw_name
191 .encode_utf16()
192 .chain(std::iter::once(0))
193 .collect();
194
195 let daw_to_host = unsafe { CreateEventW(std::ptr::null_mut(), 0, 0, d2h_wide.as_ptr()) };
196 if daw_to_host.is_null() {
197 return Err(io::Error::new(
198 io::ErrorKind::Other,
199 format!("CreateEventW failed: {}", unsafe { GetLastError() }),
200 ));
201 }
202 let host_to_daw = unsafe { CreateEventW(std::ptr::null_mut(), 0, 0, h2d_wide.as_ptr()) };
203 if host_to_daw.is_null() {
204 unsafe { windows_sys::Win32::Foundation::CloseHandle(daw_to_host) };
205 return Err(io::Error::new(
206 io::ErrorKind::Other,
207 format!("CreateEventW failed: {}", unsafe { GetLastError() }),
208 ));
209 }
210
211 Ok(Self {
212 daw_to_host,
213 host_to_daw,
214 daw_to_host_name,
215 host_to_daw_name,
216 })
217 }
218
219 pub fn from_names(daw_to_host_name: &str, host_to_daw_name: &str) -> io::Result<Self> {
221 use windows_sys::Win32::Foundation::GetLastError;
222 use windows_sys::Win32::System::Threading::OpenEventW;
223
224 let d2h_wide: Vec<u16> = daw_to_host_name
225 .encode_utf16()
226 .chain(std::iter::once(0))
227 .collect();
228 let h2d_wide: Vec<u16> = host_to_daw_name
229 .encode_utf16()
230 .chain(std::iter::once(0))
231 .collect();
232
233 let daw_to_host = unsafe {
234 OpenEventW(
235 windows_sys::Win32::System::Threading::EVENT_ALL_ACCESS,
236 0,
237 d2h_wide.as_ptr(),
238 )
239 };
240 if daw_to_host.is_null() {
241 return Err(io::Error::new(
242 io::ErrorKind::Other,
243 format!("OpenEventW failed: {}", unsafe { GetLastError() }),
244 ));
245 }
246 let host_to_daw = unsafe {
247 OpenEventW(
248 windows_sys::Win32::System::Threading::EVENT_ALL_ACCESS,
249 0,
250 h2d_wide.as_ptr(),
251 )
252 };
253 if host_to_daw.is_null() {
254 unsafe { windows_sys::Win32::Foundation::CloseHandle(daw_to_host) };
255 return Err(io::Error::new(
256 io::ErrorKind::Other,
257 format!("OpenEventW failed: {}", unsafe { GetLastError() }),
258 ));
259 }
260
261 Ok(Self {
262 daw_to_host,
263 host_to_daw,
264 daw_to_host_name: daw_to_host_name.to_string(),
265 host_to_daw_name: host_to_daw_name.to_string(),
266 })
267 }
268
269 pub fn daw_to_host_name(&self) -> &str {
270 &self.daw_to_host_name
271 }
272
273 pub fn host_to_daw_name(&self) -> &str {
274 &self.host_to_daw_name
275 }
276
277 pub fn signal_host(&self) -> io::Result<()> {
279 use windows_sys::Win32::Foundation::GetLastError;
280 use windows_sys::Win32::System::Threading::SetEvent;
281 if unsafe { SetEvent(self.daw_to_host) } == 0 {
282 return Err(io::Error::new(
283 io::ErrorKind::Other,
284 format!("SetEvent failed: {}", unsafe { GetLastError() }),
285 ));
286 }
287 Ok(())
288 }
289
290 pub fn wait_host(&self, timeout: Duration) -> io::Result<()> {
292 self.wait_object(self.host_to_daw, timeout)
293 }
294
295 pub fn wait_daw(&self, timeout: Duration) -> io::Result<()> {
297 self.wait_object(self.daw_to_host, timeout)
298 }
299
300 pub fn wait_daw_with_message_pump(&self, timeout: Duration) -> io::Result<()> {
302 use windows_sys::Win32::Foundation::{GetLastError, WAIT_OBJECT_0, WAIT_TIMEOUT};
303 use windows_sys::Win32::UI::WindowsAndMessaging::{
304 DispatchMessageW, MSG, MsgWaitForMultipleObjects, PM_REMOVE, PeekMessageW, QS_ALLINPUT,
305 TranslateMessage,
306 };
307
308 let start = std::time::Instant::now();
309 let handles = [self.daw_to_host];
310 let ms_total = timeout.as_millis().clamp(0, u32::MAX as u128) as u32;
311
312 loop {
313 let elapsed = start.elapsed().as_millis().clamp(0, u32::MAX as u128) as u32;
314 let remaining = ms_total.saturating_sub(elapsed);
315
316 let rc = unsafe {
317 MsgWaitForMultipleObjects(1, handles.as_ptr(), 0, remaining, QS_ALLINPUT)
318 };
319
320 if rc == WAIT_OBJECT_0 {
321 return Ok(());
322 } else if rc == WAIT_OBJECT_0 + 1 {
323 unsafe {
324 let mut msg: MSG = std::mem::zeroed();
325 while PeekMessageW(&mut msg, std::ptr::null_mut(), 0, 0, PM_REMOVE) != 0 {
326 TranslateMessage(&msg);
327 DispatchMessageW(&msg);
328 }
329 }
330 } else if rc == WAIT_TIMEOUT {
331 return Err(io::Error::new(
332 io::ErrorKind::TimedOut,
333 "MsgWaitForMultipleObjects timeout",
334 ));
335 } else {
336 return Err(io::Error::new(
337 io::ErrorKind::Other,
338 format!("MsgWaitForMultipleObjects failed: {}", unsafe {
339 GetLastError()
340 }),
341 ));
342 }
343 }
344 }
345
346 pub fn signal_daw(&self) -> io::Result<()> {
348 use windows_sys::Win32::Foundation::GetLastError;
349 use windows_sys::Win32::System::Threading::SetEvent;
350 if unsafe { SetEvent(self.host_to_daw) } == 0 {
351 return Err(io::Error::new(
352 io::ErrorKind::Other,
353 format!("SetEvent failed: {}", unsafe { GetLastError() }),
354 ));
355 }
356 Ok(())
357 }
358
359 pub fn close_daw_unused(&mut self) {}
361
362 pub fn close_host_unused(&mut self) {}
364
365 fn wait_object(&self, handle: *mut std::ffi::c_void, timeout: Duration) -> io::Result<()> {
366 use windows_sys::Win32::Foundation::{GetLastError, WAIT_OBJECT_0, WAIT_TIMEOUT};
367 use windows_sys::Win32::System::Threading::WaitForSingleObject;
368
369 let ms = timeout.as_millis().clamp(0, u32::MAX as u128) as u32;
370 let rc = unsafe { WaitForSingleObject(handle, ms) };
371 if rc == WAIT_OBJECT_0 {
372 Ok(())
373 } else if rc == WAIT_TIMEOUT {
374 Err(io::Error::new(
375 io::ErrorKind::TimedOut,
376 "WaitForSingleObject timeout",
377 ))
378 } else {
379 Err(io::Error::new(
380 io::ErrorKind::Other,
381 format!("WaitForSingleObject failed: {}", unsafe { GetLastError() }),
382 ))
383 }
384 }
385}
386
387#[cfg(windows)]
388impl Drop for EventPair {
389 fn drop(&mut self) {
390 use windows_sys::Win32::Foundation::CloseHandle;
391 if !self.daw_to_host.is_null() {
392 unsafe { CloseHandle(self.daw_to_host) };
393 }
394 if !self.host_to_daw.is_null() {
395 unsafe { CloseHandle(self.host_to_daw) };
396 }
397 }
398}