rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Tokio-backed signaler.

use super::AsyncSignaler;

#[cfg(unix)]
use super::os;

use std::future::Future;
use std::io;

/// Edge-triggered fd-or-Notify signaler. On Linux uses `eventfd` registered
/// with the tokio reactor via `AsyncFd`; on other unix uses a `pipe2`. On
/// Windows falls back to `tokio::sync::Notify` (no `AsyncFd` analogue).
pub(crate) struct TokioSignaler {
    inner: Inner,
}

#[cfg(target_os = "linux")]
type Inner = EventFdInner;
#[cfg(all(unix, not(target_os = "linux")))]
type Inner = PipeInner;
#[cfg(windows)]
type Inner = NotifyInner;

impl TokioSignaler {
    pub(crate) fn new() -> io::Result<Self> {
        Ok(Self {
            inner: Inner::new()?,
        })
    }
}

impl AsyncSignaler for TokioSignaler {
    #[inline]
    fn signal(&self) {
        self.inner.signal();
    }

    fn signaled(&self) -> impl Future<Output = ()> + Send + '_ {
        self.inner.signaled()
    }
}

// ---- Linux: eventfd ----

#[cfg(target_os = "linux")]
pub(crate) struct EventFdInner {
    fd: ::tokio::io::unix::AsyncFd<std::os::fd::OwnedFd>,
}

#[cfg(target_os = "linux")]
impl EventFdInner {
    fn new() -> io::Result<Self> {
        let raw = os::make_eventfd()?;
        let fd = ::tokio::io::unix::AsyncFd::with_interest(raw, ::tokio::io::Interest::READABLE)?;
        Ok(Self { fd })
    }

    fn signal(&self) {
        use std::os::fd::AsFd;
        // Best-effort: a write failure other than EAGAIN is a runtime
        // bug (the fd is owned, not closed externally). Silently drop —
        // the consumer will park until the next successful signal, which
        // is what we want over panicking on a hot path.
        let _ = os::signal_write_eventfd(self.fd.get_ref().as_fd());
    }

    async fn signaled(&self) {
        use std::os::fd::AsFd;
        loop {
            // `readable()` is cancel-safe and edge-aware; we drain on
            // every wake so kernel readiness stays in sync with our
            // observation.
            let mut guard = match self.fd.readable().await {
                Ok(g) => g,
                Err(_) => return, // reactor shutdown; treat as wake
            };
            match os::drain_eventfd(self.fd.get_ref().as_fd()) {
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    guard.clear_ready();
                }
                _ => return,
            }
        }
    }
}

// ---- Other unix: pipe ----

#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) struct PipeInner {
    rx: ::tokio::io::unix::AsyncFd<std::os::fd::OwnedFd>,
    tx: std::sync::Arc<std::os::fd::OwnedFd>,
}

#[cfg(all(unix, not(target_os = "linux")))]
impl PipeInner {
    fn new() -> io::Result<Self> {
        let (r, w) = os::make_pipe()?;
        let rx = ::tokio::io::unix::AsyncFd::with_interest(r, ::tokio::io::Interest::READABLE)?;
        Ok(Self {
            rx,
            tx: std::sync::Arc::new(w),
        })
    }

    fn signal(&self) {
        use std::os::fd::AsFd;
        let _ = os::signal_write_pipe(self.tx.as_fd());
    }

    async fn signaled(&self) {
        use std::os::fd::AsFd;
        loop {
            let mut guard = match self.rx.readable().await {
                Ok(g) => g,
                Err(_) => return,
            };
            match os::drain_pipe(self.rx.get_ref().as_fd()) {
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    guard.clear_ready();
                }
                _ => return,
            }
        }
    }
}

// ---- Windows: Notify fallback ----

#[cfg(windows)]
pub(crate) struct NotifyInner {
    notify: std::sync::Arc<::tokio::sync::Notify>,
}

#[cfg(windows)]
impl NotifyInner {
    fn new() -> io::Result<Self> {
        Ok(Self {
            notify: std::sync::Arc::new(::tokio::sync::Notify::new()),
        })
    }

    fn signal(&self) {
        self.notify.notify_one();
    }

    async fn signaled(&self) {
        self.notify.notified().await;
    }
}