ipcez 0.1.0

Rust library for ipcez.
Documentation
//! Async stream that yields when a named Windows event or POSIX semaphore is signaled.
//! Creates/opens the event or semaphore and waits on it (without blocking the async executor);
//! used by the message handler for local transports to wake on "data ready".

#[cfg(windows)]
mod stream_impl {
    use std::pin::Pin;
    use std::sync::Arc;
    use std::task::{Context, Poll};
    use std::thread::{self, JoinHandle};

    use futures_util::Stream;
    use tokio::sync::Notify;
    use tokio::sync::futures::OwnedNotified;
    use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0};
    use windows_sys::Win32::System::Threading::{
        CreateEventW, ResetEvent, SetEvent, WaitForMultipleObjects, INFINITE,
    };

    /// Async stream that yields one item each time the named Windows event is signaled.
    /// A dedicated thread waits on the event (without blocking the async executor), notifies
    /// when signaled, then resets the event and waits again. The stream yields `()` for each notification.
    pub(crate) struct NamedEventStream {
        notify: Arc<Notify>,
        notified: Option<Pin<Box<OwnedNotified>>>,
        _thread: JoinHandle<()>,
        _handles: EventHandles,
    }

    struct EventHandles {
        data_ready: HANDLE,
        shutdown: HANDLE,
    }

    impl Drop for EventHandles {
        fn drop(&mut self) {
            if self.shutdown != 0 {
                unsafe {
                    SetEvent(self.shutdown);
                    CloseHandle(self.shutdown);
                }
                self.shutdown = 0;
            }
            if self.data_ready != 0 {
                unsafe { CloseHandle(self.data_ready) };
                self.data_ready = 0;
            }
        }
    }

    impl NamedEventStream {
        /// Creates a stream for the given event name. The event is created or opened so this process
        /// can wait and reset it. Returns `None` if the event could not be created/opened.
        pub(crate) fn new(event_name: &str) -> Option<Self> {
            let name_wide: Vec<u16> = event_name.encode_utf16().chain(std::iter::once(0)).collect();
            let data_ready = unsafe {
                CreateEventW(
                    std::ptr::null_mut(),
                    1, // bManualReset = TRUE
                    0, // bInitialState = FALSE
                    name_wide.as_ptr(),
                )
            };
            if data_ready == 0 {
                return None;
            }
            let shutdown_name: Vec<u16> = format!(r"{}.shutdown", event_name)
                .encode_utf16()
                .chain(std::iter::once(0))
                .collect();
            let shutdown = unsafe {
                CreateEventW(
                    std::ptr::null_mut(),
                    1,
                    0,
                    shutdown_name.as_ptr(),
                )
            };
            if shutdown == 0 {
                unsafe { CloseHandle(data_ready) };
                return None;
            }
            let handles = EventHandles {
                data_ready,
                shutdown,
            };
            let notify = Arc::new(Notify::new());
            let notify_clone = Arc::clone(&notify);
            let data_ready_t = data_ready;
            let shutdown_t = shutdown;
            let _thread = thread::spawn(move || {
                loop {
                    let r = unsafe {
                        WaitForMultipleObjects(2, [data_ready_t, shutdown_t].as_ptr(), 0, INFINITE)
                    };
                    if r == WAIT_OBJECT_0 {
                        notify_clone.notify_one();
                        if unsafe { ResetEvent(data_ready_t) } == 0 {
                            break;
                        }
                    } else {
                        break;
                    }
                }
            });
            let notified = Some(Box::pin(Arc::clone(&notify).notified_owned()));
            Some(Self {
                notify,
                notified,
                _thread,
                _handles: handles,
            })
        }
    }

    impl Stream for NamedEventStream {
        type Item = ();

        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            let mut notified = self.notified.take();
            if let Some(ref mut n) = notified {
                if n.as_mut().poll(cx).is_ready() {
                    self.notified = Some(Box::pin(
                        Arc::clone(&self.notify).notified_owned(),
                    ));
                    return Poll::Ready(Some(()));
                }
            }
            self.notified = notified;
            Poll::Pending
        }
    }
}

/// Returns a stream that yields once each time the named Windows event is signaled.
/// The stream does not block the async executor: a dedicated thread waits on the event.
/// After each yield the event is reset so the next wait blocks until the next signal.
/// Returns `None` if the event could not be created or opened (e.g. name invalid).
#[cfg(windows)]
pub(crate) fn named_event_stream(event_name: &str) -> Option<stream_impl::NamedEventStream> {
    stream_impl::NamedEventStream::new(event_name)
}

#[cfg(unix)]
mod stream_impl {
    use std::pin::Pin;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use std::sync::mpsc;
    use std::task::{Context, Poll};
    use std::thread::{self, JoinHandle};
    use std::time::Duration;

    use futures_util::Stream;
    use tokio::sync::Notify;
    use tokio::sync::futures::OwnedNotified;

    /// Async stream that yields one item each time the named POSIX semaphore is posted.
    /// A dedicated thread waits on the semaphore via timed wait (without blocking the async executor),
    /// notifies when signaled, then waits again. The stream yields `()` for each notification.
    pub(crate) struct NamedEventStream {
        notify: Arc<Notify>,
        notified: Option<Pin<Box<OwnedNotified>>>,
        shutdown: Arc<AtomicBool>,
        _thread: JoinHandle<()>,
    }

    impl NamedEventStream {
        /// Creates a stream for the given semaphore name. The semaphore is created or opened inside a dedicated thread
        /// so the semaphore handle is not sent across threads. Returns `None` if the semaphore could not be created/opened.
        pub(crate) fn new(event_name: &str) -> Option<Self> {
            let event_name = event_name.to_string();
            let (tx, rx) = mpsc::channel::<Result<(), ()>>();
            let notify = Arc::new(Notify::new());
            let notify_clone = Arc::clone(&notify);
            let shutdown = Arc::new(AtomicBool::new(false));
            let shutdown_clone = Arc::clone(&shutdown);
            const POLL_MS: u64 = 100;
            let _thread = thread::spawn(move || {
                let mut sem = match named_sem::NamedSemaphore::create(&event_name, 0) {
                    Ok(s) => s,
                    Err(_) => {
                        let _ = tx.send(Err(()));
                        return;
                    }
                };
                let _ = tx.send(Ok(()));
                loop {
                    match sem.timed_wait(Duration::from_millis(POLL_MS)) {
                        Ok(()) => {
                            notify_clone.notify_one();
                        }
                        Err(named_sem::Error::WaitTimeout) => {
                            if shutdown_clone.load(Ordering::Relaxed) {
                                break;
                            }
                        }
                        Err(_) => break,
                    }
                }
            });
            if rx.recv() != Ok(Ok(())) {
                return None;
            }
            let notified = Some(Box::pin(Arc::clone(&notify).notified_owned()));
            Some(Self {
                notify,
                notified,
                shutdown,
                _thread,
            })
        }
    }

    impl Drop for NamedEventStream {
        fn drop(&mut self) {
            self.shutdown.store(true, Ordering::Relaxed);
        }
    }

    impl Stream for NamedEventStream {
        type Item = ();

        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            let mut notified = self.notified.take();
            if let Some(ref mut n) = notified {
                if n.as_mut().poll(cx).is_ready() {
                    self.notified = Some(Box::pin(
                        Arc::clone(&self.notify).notified_owned(),
                    ));
                    return Poll::Ready(Some(()));
                }
            }
            self.notified = notified;
            Poll::Pending
        }
    }
}

/// Returns a stream that yields once each time the named semaphore is posted (Linux).
/// The stream does not block the async executor: a dedicated thread waits on the semaphore with a timeout.
/// Returns `None` if the semaphore could not be created or opened.
#[cfg(unix)]
pub(crate) fn named_event_stream(event_name: &str) -> Option<stream_impl::NamedEventStream> {
    stream_impl::NamedEventStream::new(event_name)
}

/// Error from waiting for recipient acknowledgment.
#[derive(Debug)]
pub(crate) enum AckWaitError {
    Timeout,
    CreateOpenFailed,
}

/// Blocks until the named "data acked" event (Windows) or semaphore (Unix) is signaled, or the timeout expires.
/// Must be called from a blocking context (e.g. `spawn_blocking`); do not call from the async executor.
#[cfg(windows)]
pub(crate) fn wait_for_ack(ack_event_name: &str, timeout_ms: u32) -> Result<(), AckWaitError> {
    use windows_sys::Win32::Foundation::{CloseHandle, WAIT_OBJECT_0};
    use windows_sys::Win32::System::Threading::{CreateEventW, WaitForSingleObject};
    const WAIT_TIMEOUT: u32 = 258;
    let name_wide: Vec<u16> = ack_event_name.encode_utf16().chain(std::iter::once(0)).collect();
    let handle = unsafe {
        CreateEventW(
            std::ptr::null_mut(),
            1, // bManualReset = TRUE
            0, // bInitialState = FALSE
            name_wide.as_ptr(),
        )
    };
    if handle == 0 {
        return Err(AckWaitError::CreateOpenFailed);
    }
    let result = unsafe { WaitForSingleObject(handle, timeout_ms) };
    unsafe { CloseHandle(handle) };
    match result {
        r if r == WAIT_OBJECT_0 => Ok(()),
        r if r == WAIT_TIMEOUT => Err(AckWaitError::Timeout),
        _ => Err(AckWaitError::CreateOpenFailed),
    }
}

#[cfg(unix)]
pub(crate) fn wait_for_ack(ack_event_name: &str, timeout_ms: u32) -> Result<(), AckWaitError> {
    use std::time::Duration;
    let mut sem = named_sem::NamedSemaphore::create(ack_event_name, 0)
        .map_err(|_| AckWaitError::CreateOpenFailed)?;
    match sem.timed_wait(Duration::from_millis(timeout_ms as u64)) {
        Ok(()) => Ok(()),
        Err(named_sem::Error::WaitTimeout) => Err(AckWaitError::Timeout),
        Err(_) => Err(AckWaitError::CreateOpenFailed),
    }
}

#[cfg(all(test, unix))]
mod tests {
    use std::time::Duration;

    use futures_util::StreamExt;

    use super::*;
    use crate::event_sender;

    /// Linux: data_ready_event_name returns a name starting with `/` with no extra slashes.
    #[test]
    fn data_ready_event_name_linux_format() {
        let name = event_sender::data_ready_event_name("mypipe");
        assert!(name.starts_with('/'), "POSIX semaphore name must start with /");
        assert_eq!(name.matches('/').count(), 1, "name must contain no further slashes");
        assert!(name.contains("data_ready"));
    }

    /// Linux: stream yields when semaphore is posted from another thread.
    #[test]
    fn named_semaphore_stream_yields_on_signal() {
        let unique = format!("test_{}", std::process::id());
        let event_name = event_sender::data_ready_event_name(&unique);
        let mut stream = match named_event_stream(&event_name) {
            Some(s) => s,
            None => panic!("named_event_stream should succeed with valid name"),
        };
        let (tx, rx) = std::sync::mpsc::channel();
        let event_name_clone = event_name.clone();
        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_millis(50));
            event_sender::signal_named_event(&event_name_clone);
            let _ = tx.send(());
        });
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();
        let first = rt.block_on(async { stream.next().await });
        assert_eq!(first, Some(()));
        let _ = rx.recv_timeout(Duration::from_secs(1));
    }
}