use futures::{stream::FusedStream, task::AtomicWaker, Stream};
use std::{
ffi::c_void,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
};
use windows::{
core::Result,
Win32::{
Foundation::{CloseHandle, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, ResetEvent, UnregisterWaitEx, INFINITE,
WT_EXECUTEINWAITTHREAD,
},
},
};
pub struct Win32EventStream {
#[allow(dead_code)]
notif: Win32EventNotification,
waker: Arc<AtomicWaker>,
ready: Arc<AtomicBool>,
}
impl Win32EventStream {
pub fn new(event_handle: HANDLE) -> Result<Self> {
let waker = Arc::new(AtomicWaker::new());
let ready = Arc::new(AtomicBool::new(false));
let event_handle_raw = event_handle.0 as usize;
Ok(Self {
waker: waker.clone(),
ready: ready.clone(),
notif: Win32EventNotification::new(
event_handle,
Box::new(move |_| {
ready.store(true, Ordering::SeqCst);
waker.wake();
let handle = HANDLE(event_handle_raw as *mut c_void);
let _ = unsafe { ResetEvent(handle) };
}),
)?,
})
}
}
impl Stream for Win32EventStream {
type Item = Result<()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
if this.ready.swap(false, Ordering::Relaxed) {
Poll::Ready(Some(Ok(())))
} else {
this.waker.register(cx.waker());
Poll::Pending
}
}
}
impl FusedStream for Win32EventStream {
fn is_terminated(&self) -> bool {
false
}
}
struct Win32EventNotification {
win32_event: HANDLE, wait_object: HANDLE, callback: *mut Win32EventCallback, }
impl std::fmt::Debug for Win32EventNotification {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Win32EventNotification: {:?}", self.wait_object)
}
}
type Win32EventCallback = Box<dyn Fn(bool) + Send>;
impl Win32EventNotification {
fn new(win32_event: HANDLE, cb: Win32EventCallback) -> Result<Self> {
unsafe extern "system" fn global_callback(caller_context: *mut c_void, time_out: bool) {
(**(caller_context as *mut Win32EventCallback))(time_out)
}
let callback = Box::into_raw(Box::new(cb)); let mut wait_object: HANDLE = HANDLE(std::ptr::null_mut());
let rc = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
win32_event,
Some(global_callback),
Some(callback as *const c_void),
INFINITE,
WT_EXECUTEINWAITTHREAD,
)
};
match rc {
Ok(_) => Ok(Self {
callback,
win32_event,
wait_object,
}),
Err(e) => {
drop(unsafe { Box::from_raw(callback) }); Err(e)
}
}
}
}
impl Drop for Win32EventNotification {
fn drop(&mut self) {
unsafe {
if UnregisterWaitEx(self.wait_object, Some(self.win32_event)).is_err() {
}
drop(Box::from_raw(self.callback)); }
let _ = unsafe {
CloseHandle(self.win32_event)
};
}
}
unsafe impl Send for Win32EventNotification {}