#[cfg(windows)]
mod stream_impl {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread::{self, JoinHandle};
use futures_util::Stream;
use tokio::sync::Notify;
use tokio::sync::futures::OwnedNotified;
use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0};
use windows_sys::Win32::System::Threading::{
CreateEventW, ResetEvent, SetEvent, WaitForMultipleObjects, INFINITE,
};
pub(crate) struct NamedEventStream {
notify: Arc<Notify>,
notified: Option<Pin<Box<OwnedNotified>>>,
_thread: JoinHandle<()>,
_handles: EventHandles,
}
struct EventHandles {
data_ready: HANDLE,
shutdown: HANDLE,
}
impl Drop for EventHandles {
fn drop(&mut self) {
if self.shutdown != 0 {
unsafe {
SetEvent(self.shutdown);
CloseHandle(self.shutdown);
}
self.shutdown = 0;
}
if self.data_ready != 0 {
unsafe { CloseHandle(self.data_ready) };
self.data_ready = 0;
}
}
}
impl NamedEventStream {
pub(crate) fn new(event_name: &str) -> Option<Self> {
let name_wide: Vec<u16> = event_name.encode_utf16().chain(std::iter::once(0)).collect();
let data_ready = unsafe {
CreateEventW(
std::ptr::null_mut(),
1, 0, name_wide.as_ptr(),
)
};
if data_ready == 0 {
return None;
}
let shutdown_name: Vec<u16> = format!(r"{}.shutdown", event_name)
.encode_utf16()
.chain(std::iter::once(0))
.collect();
let shutdown = unsafe {
CreateEventW(
std::ptr::null_mut(),
1,
0,
shutdown_name.as_ptr(),
)
};
if shutdown == 0 {
unsafe { CloseHandle(data_ready) };
return None;
}
let handles = EventHandles {
data_ready,
shutdown,
};
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(¬ify);
let data_ready_t = data_ready;
let shutdown_t = shutdown;
let _thread = thread::spawn(move || {
loop {
let r = unsafe {
WaitForMultipleObjects(2, [data_ready_t, shutdown_t].as_ptr(), 0, INFINITE)
};
if r == WAIT_OBJECT_0 {
notify_clone.notify_one();
if unsafe { ResetEvent(data_ready_t) } == 0 {
break;
}
} else {
break;
}
}
});
let notified = Some(Box::pin(Arc::clone(¬ify).notified_owned()));
Some(Self {
notify,
notified,
_thread,
_handles: handles,
})
}
}
impl Stream for NamedEventStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut notified = self.notified.take();
if let Some(ref mut n) = notified {
if n.as_mut().poll(cx).is_ready() {
self.notified = Some(Box::pin(
Arc::clone(&self.notify).notified_owned(),
));
return Poll::Ready(Some(()));
}
}
self.notified = notified;
Poll::Pending
}
}
}
#[cfg(windows)]
pub(crate) fn named_event_stream(event_name: &str) -> Option<stream_impl::NamedEventStream> {
stream_impl::NamedEventStream::new(event_name)
}
#[cfg(unix)]
mod stream_impl {
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::mpsc;
use std::task::{Context, Poll};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use futures_util::Stream;
use tokio::sync::Notify;
use tokio::sync::futures::OwnedNotified;
pub(crate) struct NamedEventStream {
notify: Arc<Notify>,
notified: Option<Pin<Box<OwnedNotified>>>,
shutdown: Arc<AtomicBool>,
_thread: JoinHandle<()>,
}
impl NamedEventStream {
pub(crate) fn new(event_name: &str) -> Option<Self> {
let event_name = event_name.to_string();
let (tx, rx) = mpsc::channel::<Result<(), ()>>();
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(¬ify);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
const POLL_MS: u64 = 100;
let _thread = thread::spawn(move || {
let mut sem = match named_sem::NamedSemaphore::create(&event_name, 0) {
Ok(s) => s,
Err(_) => {
let _ = tx.send(Err(()));
return;
}
};
let _ = tx.send(Ok(()));
loop {
match sem.timed_wait(Duration::from_millis(POLL_MS)) {
Ok(()) => {
notify_clone.notify_one();
}
Err(named_sem::Error::WaitTimeout) => {
if shutdown_clone.load(Ordering::Relaxed) {
break;
}
}
Err(_) => break,
}
}
});
if rx.recv() != Ok(Ok(())) {
return None;
}
let notified = Some(Box::pin(Arc::clone(¬ify).notified_owned()));
Some(Self {
notify,
notified,
shutdown,
_thread,
})
}
}
impl Drop for NamedEventStream {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
}
}
impl Stream for NamedEventStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut notified = self.notified.take();
if let Some(ref mut n) = notified {
if n.as_mut().poll(cx).is_ready() {
self.notified = Some(Box::pin(
Arc::clone(&self.notify).notified_owned(),
));
return Poll::Ready(Some(()));
}
}
self.notified = notified;
Poll::Pending
}
}
}
#[cfg(unix)]
pub(crate) fn named_event_stream(event_name: &str) -> Option<stream_impl::NamedEventStream> {
stream_impl::NamedEventStream::new(event_name)
}
#[derive(Debug)]
pub(crate) enum AckWaitError {
Timeout,
CreateOpenFailed,
}
#[cfg(windows)]
pub(crate) fn wait_for_ack(ack_event_name: &str, timeout_ms: u32) -> Result<(), AckWaitError> {
use windows_sys::Win32::Foundation::{CloseHandle, WAIT_OBJECT_0};
use windows_sys::Win32::System::Threading::{CreateEventW, WaitForSingleObject};
const WAIT_TIMEOUT: u32 = 258;
let name_wide: Vec<u16> = ack_event_name.encode_utf16().chain(std::iter::once(0)).collect();
let handle = unsafe {
CreateEventW(
std::ptr::null_mut(),
1, 0, name_wide.as_ptr(),
)
};
if handle == 0 {
return Err(AckWaitError::CreateOpenFailed);
}
let result = unsafe { WaitForSingleObject(handle, timeout_ms) };
unsafe { CloseHandle(handle) };
match result {
r if r == WAIT_OBJECT_0 => Ok(()),
r if r == WAIT_TIMEOUT => Err(AckWaitError::Timeout),
_ => Err(AckWaitError::CreateOpenFailed),
}
}
#[cfg(unix)]
pub(crate) fn wait_for_ack(ack_event_name: &str, timeout_ms: u32) -> Result<(), AckWaitError> {
use std::time::Duration;
let mut sem = named_sem::NamedSemaphore::create(ack_event_name, 0)
.map_err(|_| AckWaitError::CreateOpenFailed)?;
match sem.timed_wait(Duration::from_millis(timeout_ms as u64)) {
Ok(()) => Ok(()),
Err(named_sem::Error::WaitTimeout) => Err(AckWaitError::Timeout),
Err(_) => Err(AckWaitError::CreateOpenFailed),
}
}
#[cfg(all(test, unix))]
mod tests {
use std::time::Duration;
use futures_util::StreamExt;
use super::*;
use crate::event_sender;
#[test]
fn data_ready_event_name_linux_format() {
let name = event_sender::data_ready_event_name("mypipe");
assert!(name.starts_with('/'), "POSIX semaphore name must start with /");
assert_eq!(name.matches('/').count(), 1, "name must contain no further slashes");
assert!(name.contains("data_ready"));
}
#[test]
fn named_semaphore_stream_yields_on_signal() {
let unique = format!("test_{}", std::process::id());
let event_name = event_sender::data_ready_event_name(&unique);
let mut stream = match named_event_stream(&event_name) {
Some(s) => s,
None => panic!("named_event_stream should succeed with valid name"),
};
let (tx, rx) = std::sync::mpsc::channel();
let event_name_clone = event_name.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
event_sender::signal_named_event(&event_name_clone);
let _ = tx.send(());
});
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let first = rt.block_on(async { stream.next().await });
assert_eq!(first, Some(()));
let _ = rx.recv_timeout(Duration::from_secs(1));
}
}