rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! OS-level fd helpers shared by all runtimes.
//!
//! Linux uses `eventfd` (single fd, u64 counter). Other unix uses a `pipe2`
//! pair (single byte per signal, drain to coalesce). Windows has no fd path
//! here — runtime modules fall back to a userspace primitive.

#![cfg(unix)]

use std::io;
#[cfg(unix)]
use std::os::fd::{AsRawFd, BorrowedFd, OwnedFd};

#[cfg(target_os = "linux")]
pub(crate) fn make_eventfd() -> io::Result<OwnedFd> {
    use std::os::fd::FromRawFd;
    let flags = libc::EFD_CLOEXEC | libc::EFD_NONBLOCK;
    let fd = unsafe { libc::eventfd(0, flags) };
    if fd < 0 {
        return Err(io::Error::last_os_error());
    }
    Ok(unsafe { OwnedFd::from_raw_fd(fd) })
}

#[cfg(target_os = "linux")]
pub(crate) fn signal_write_eventfd(fd: BorrowedFd<'_>) -> io::Result<()> {
    let one: u64 = 1;
    let buf = one.to_ne_bytes();
    loop {
        let n = unsafe {
            libc::write(
                fd.as_raw_fd(),
                buf.as_ptr().cast::<libc::c_void>(),
                buf.len(),
            )
        };
        if n == buf.len() as isize {
            return Ok(());
        }
        let err = io::Error::last_os_error();
        match err.raw_os_error() {
            // Counter saturated (2^64 - 2): a wake is already pending.
            // Edge-triggered semantics — coalesce silently.
            // EAGAIN: wake-already-pending. EWOULDBLOCK is the same value
            // on every supported platform — the explicit alt would be
            // `Some(x) if x == libc::EWOULDBLOCK`, but it's redundant.
            Some(libc::EAGAIN) => return Ok(()),
            Some(libc::EINTR) => {}
            _ => return Err(err),
        }
    }
}

#[cfg(target_os = "linux")]
pub(crate) fn drain_eventfd(fd: BorrowedFd<'_>) -> io::Result<u64> {
    let mut buf = [0u8; 8];
    loop {
        let n = unsafe {
            libc::read(
                fd.as_raw_fd(),
                buf.as_mut_ptr().cast::<libc::c_void>(),
                buf.len(),
            )
        };
        if n == buf.len() as isize {
            return Ok(u64::from_ne_bytes(buf));
        }
        let err = io::Error::last_os_error();
        match err.raw_os_error() {
            Some(libc::EINTR) => {}
            _ => return Err(err),
        }
    }
}

#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) fn make_pipe() -> io::Result<(OwnedFd, OwnedFd)> {
    use std::os::fd::FromRawFd;
    let mut fds = [0 as libc::c_int; 2];
    // pipe2 isn't on macOS; use pipe + manual flag setup.
    let rc = unsafe { libc::pipe(fds.as_mut_ptr()) };
    if rc < 0 {
        return Err(io::Error::last_os_error());
    }
    let (r, w) = unsafe { (OwnedFd::from_raw_fd(fds[0]), OwnedFd::from_raw_fd(fds[1])) };
    set_cloexec_nonblock(r.as_raw_fd())?;
    set_cloexec_nonblock(w.as_raw_fd())?;
    Ok((r, w))
}

#[cfg(all(unix, not(target_os = "linux")))]
fn set_cloexec_nonblock(fd: libc::c_int) -> io::Result<()> {
    unsafe {
        let flags = libc::fcntl(fd, libc::F_GETFL);
        if flags < 0 {
            return Err(io::Error::last_os_error());
        }
        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
            return Err(io::Error::last_os_error());
        }
        let fd_flags = libc::fcntl(fd, libc::F_GETFD);
        if fd_flags < 0 {
            return Err(io::Error::last_os_error());
        }
        if libc::fcntl(fd, libc::F_SETFD, fd_flags | libc::FD_CLOEXEC) < 0 {
            return Err(io::Error::last_os_error());
        }
    }
    Ok(())
}

#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) fn signal_write_pipe(fd: BorrowedFd<'_>) -> io::Result<()> {
    let buf = [0u8; 1];
    loop {
        let n = unsafe {
            libc::write(
                fd.as_raw_fd(),
                buf.as_ptr().cast::<libc::c_void>(),
                buf.len(),
            )
        };
        if n == 1 {
            return Ok(());
        }
        let err = io::Error::last_os_error();
        match err.raw_os_error() {
            // Pipe full: a wake is already pending in the buffer.
            // EAGAIN: wake-already-pending. EWOULDBLOCK is the same value
            // on every supported platform — the explicit alt would be
            // `Some(x) if x == libc::EWOULDBLOCK`, but it's redundant.
            Some(libc::EAGAIN) => return Ok(()),
            Some(libc::EINTR) => {}
            _ => return Err(err),
        }
    }
}

#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) fn drain_pipe(fd: BorrowedFd<'_>) -> io::Result<usize> {
    let mut buf = [0u8; 64];
    let mut total = 0usize;
    loop {
        let n = unsafe {
            libc::read(
                fd.as_raw_fd(),
                buf.as_mut_ptr().cast::<libc::c_void>(),
                buf.len(),
            )
        };
        if n > 0 {
            total += n as usize;
            if (n as usize) < buf.len() {
                return Ok(total);
            }
            continue;
        }
        if n == 0 {
            return Ok(total);
        }
        let err = io::Error::last_os_error();
        match err.raw_os_error() {
            Some(libc::EAGAIN) if total > 0 => return Ok(total),
            Some(libc::EINTR) => {}
            _ => return Err(err),
        }
    }
}