rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Smol-backed signaler.

use super::AsyncSignaler;

#[cfg(unix)]
use super::os;

use std::future::Future;
use std::io;

/// Edge-triggered fd-or-Event signaler. On Linux uses `eventfd` registered
/// with smol's reactor via `async_io::Async`; on other unix uses a `pipe2`.
/// On Windows falls back to `event_listener::Event`.
pub(crate) struct SmolSignaler {
    inner: Inner,
}

#[cfg(target_os = "linux")]
type Inner = EventFdInner;
#[cfg(all(unix, not(target_os = "linux")))]
type Inner = PipeInner;
#[cfg(windows)]
type Inner = EventInner;

impl SmolSignaler {
    pub(crate) fn new() -> io::Result<Self> {
        Ok(Self {
            inner: Inner::new()?,
        })
    }
}

impl AsyncSignaler for SmolSignaler {
    #[inline]
    fn signal(&self) {
        self.inner.signal();
    }

    fn signaled(&self) -> impl Future<Output = ()> + Send + '_ {
        self.inner.signaled()
    }
}

// ---- Linux: eventfd ----

#[cfg(target_os = "linux")]
pub(crate) struct EventFdInner {
    fd: async_io::Async<std::os::fd::OwnedFd>,
}

#[cfg(target_os = "linux")]
impl EventFdInner {
    fn new() -> io::Result<Self> {
        let raw = os::make_eventfd()?;
        let fd = async_io::Async::new(raw)?;
        Ok(Self { fd })
    }

    fn signal(&self) {
        use std::os::fd::AsFd;
        let _ = os::signal_write_eventfd(self.fd.as_ref().as_fd());
    }

    async fn signaled(&self) {
        use std::os::fd::AsFd;
        loop {
            if self.fd.readable().await.is_err() {
                return;
            }
            match os::drain_eventfd(self.fd.as_ref().as_fd()) {
                // smol's `Async` doesn't expose `clear_ready`; the reactor
                // re-arms on the next poll. WouldBlock means a spurious
                // wake — loop back into `readable()`.
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
                _ => return,
            }
        }
    }
}

// ---- Other unix: pipe ----

#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) struct PipeInner {
    rx: async_io::Async<std::os::fd::OwnedFd>,
    tx: std::sync::Arc<std::os::fd::OwnedFd>,
}

#[cfg(all(unix, not(target_os = "linux")))]
impl PipeInner {
    fn new() -> io::Result<Self> {
        let (r, w) = os::make_pipe()?;
        let rx = async_io::Async::new(r)?;
        Ok(Self {
            rx,
            tx: std::sync::Arc::new(w),
        })
    }

    fn signal(&self) {
        use std::os::fd::AsFd;
        let _ = os::signal_write_pipe(self.tx.as_fd());
    }

    async fn signaled(&self) {
        use std::os::fd::AsFd;
        loop {
            if self.rx.readable().await.is_err() {
                return;
            }
            match os::drain_pipe(self.rx.as_ref().as_fd()) {
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {}
                _ => return,
            }
        }
    }
}

// ---- Windows: event_listener fallback ----

#[cfg(windows)]
pub(crate) struct EventInner {
    event: event_listener::Event,
}

#[cfg(windows)]
impl EventInner {
    fn new() -> io::Result<Self> {
        Ok(Self {
            event: event_listener::Event::new(),
        })
    }

    fn signal(&self) {
        self.event.notify(1usize);
    }

    async fn signaled(&self) {
        self.event.listen().await;
    }
}