varta-watch 0.2.0

Varta observer — receives VLP frames and surfaces stalls.
Documentation
//! Public receive path — `enable_credential_passing` and `recv_authenticated`.
//!
//! These are the two functions the observer calls every iteration of its
//! poll loop: once at setup (`enable_credential_passing`) and once per
//! datagram (`recv_authenticated`). The body is intentionally
//! cfg-branched on `target_os` for the `Msghdr` field-order differences;
//! everything else delegates to `super::plat` and `super::cmsg_*`.

use std::io;

use super::plat;
use super::{observer_uid, read_pid_namespace_inode, BeatOrigin, RecvResult};

/// Enable the kernel to attach sender credentials to every received datagram.
///
/// Must be called once after the observer binds its socket and before the
/// first call to [`recv_authenticated`].
///
/// On Linux this sets `SO_PASSCRED` so the kernel includes `SCM_CREDENTIALS`
/// ancillary data on every datagram.  On FreeBSD / DragonFly / NetBSD this
/// sets `LOCAL_CREDS` so the kernel includes `SCM_CREDS` ancillary data
/// (struct cmsgcred).  On illumos / Solaris this sets `SO_RECVUCRED` so the
/// kernel includes `SCM_UCRED` ancillary data (opaque `ucred_t`).  On macOS
/// and all other platforms this is a no-op — macOS uses `getsockopt` after
/// each recvmsg; other platforms fall back to socket-mode-only defence.
pub(crate) fn enable_credential_passing(fd: i32) -> io::Result<()> {
    #[cfg(target_os = "linux")]
    {
        let (level, optname) = (plat::SOL_SOCKET, plat::SO_PASSCRED);
        let one: i32 = 1;
        // SAFETY: `setsockopt(2)` with `SO_PASSCRED` (see `socket(7)` and
        // `unix(7)`) reads `optlen` bytes from `optval`.
        // - `fd` is the observer's UDS receive socket (freshly bound by the
        //   listener immediately before this call).
        // - `addr_of!(one)` produces a valid pointer to a stack-local i32
        //   that outlives the call.
        // - `optlen == size_of::<i32>()` matches what the kernel reads.
        // - The return value is checked; on error we surface the errno.
        let ret = unsafe {
            plat::setsockopt(
                fd,
                level,
                optname,
                core::ptr::addr_of!(one) as *const core::ffi::c_void,
                core::mem::size_of::<i32>() as u32,
            )
        };
        if ret != 0 {
            return Err(io::Error::last_os_error());
        }
        Ok(())
    }

    #[cfg(any(target_os = "freebsd", target_os = "dragonfly", target_os = "netbsd"))]
    {
        let (level, optname) = (plat::SOL_SOCKET, plat::LOCAL_CREDS);
        let one: i32 = 1;
        // SAFETY: `setsockopt(2)` with `LOCAL_CREDS` (see `unix(4)`) reads
        // `optlen` bytes from `optval`. Same invariants as the Linux branch
        // above: `fd` is the freshly bound UDS receive socket, `addr_of!`
        // yields a valid pointer to a stack-local i32, `optlen` matches.
        let ret = unsafe {
            plat::setsockopt(
                fd,
                level,
                optname,
                core::ptr::addr_of!(one) as *const core::ffi::c_void,
                core::mem::size_of::<i32>() as u32,
            )
        };
        if ret != 0 {
            return Err(io::Error::last_os_error());
        }
        Ok(())
    }

    #[cfg(any(target_os = "illumos", target_os = "solaris"))]
    {
        let (level, optname) = (plat::SOL_SOCKET, plat::SO_RECVUCRED);
        let one: i32 = 1;
        // SAFETY: `setsockopt(2)` with `SO_RECVUCRED` (see `socket(3SOCKET)`
        // on illumos/Solaris) reads `optlen` bytes from `optval`. Same
        // invariants as the Linux branch above.
        let ret = unsafe {
            plat::setsockopt(
                fd,
                level,
                optname,
                core::ptr::addr_of!(one) as *const core::ffi::c_void,
                core::mem::size_of::<i32>() as u32,
            )
        };
        if ret != 0 {
            return Err(io::Error::last_os_error());
        }
        Ok(())
    }

    #[cfg(not(any(
        target_os = "linux",
        target_os = "freebsd",
        target_os = "dragonfly",
        target_os = "netbsd",
        target_os = "illumos",
        target_os = "solaris",
    )))]
    {
        let _ = fd;
        Ok(())
    }
}

/// Receive one datagram from `fd` and extract its kernel-attested sender PID.
///
/// Returns [`RecvResult::Authenticated`] with the peer PID and the 32-byte
/// frame payload. Timed-out reads yield [`RecvResult::WouldBlock`]; short
/// reads yield [`RecvResult::ShortRead`]; fatal errors yield
/// [`RecvResult::IoError`].
///
/// On platforms with per-datagram kernel credential passing (Linux, macOS,
/// FreeBSD, DragonFly, NetBSD, illumos, Solaris) the returned beat carries
/// `origin = BeatOrigin::KernelAttested`. On all other platforms the beat
/// carries `origin = BeatOrigin::SocketModeOnly` with `peer_pid = 0` —
/// recovery commands are refused for those beats.
pub(crate) fn recv_authenticated(fd: i32) -> RecvResult {
    // --- Kernel-attested path (Linux / macOS / BSD / illumos / Solaris) ----
    //
    // Suppressed when `force-socketmode-fallback` is active so that the
    // generic fallback block below is reached on any host — enabling the
    // integration test to exercise `BeatOrigin::SocketModeOnly` on Linux CI.
    #[cfg(all(
        not(feature = "force-socketmode-fallback"),
        any(
            target_os = "linux",
            target_os = "macos",
            target_os = "freebsd",
            target_os = "dragonfly",
            target_os = "netbsd",
            target_os = "illumos",
            target_os = "solaris",
        )
    ))]
    {
        let mut data = [0u8; 32];

        #[repr(align(8))]
        struct AncBuf([u8; plat::ANCILLARY_BUFFER_SIZE]);
        let mut anc = AncBuf([0u8; plat::ANCILLARY_BUFFER_SIZE]);

        let mut iov = plat::Iovec {
            iov_base: data.as_mut_ptr() as *mut core::ffi::c_void,
            iov_len: 32,
        };

        let mut mhdr = plat::msghdr_for_recv(
            &mut iov,
            anc.0.as_mut_ptr() as *mut core::ffi::c_void,
            plat::ANCILLARY_BUFFER_SIZE,
        );

        let n = loop {
            // SAFETY: `recvmsg(2)` reads from `fd` and writes:
            //   - up to `iov.iov_len` (= 32) bytes into the `data` stack array
            //     via `iov.iov_base`;
            //   - up to `ANCILLARY_BUFFER_SIZE` bytes of ancillary data into
            //     the `anc` stack array via `mhdr.msg_control`;
            //   - the actual control length into `mhdr.msg_controllen` and the
            //     flags into `mhdr.msg_flags`.
            // All pointed-to buffers are stack-allocated for the duration of
            // this function. The `Msghdr` field layout is verified at compile
            // time by `offset_of!` assertions in `plat`. `&mut mhdr` is the
            // single exclusive borrow for the duration of the call. The return
            // value is checked below: `< 0` is errno, `>= 0` is byte count.
            let ret = unsafe { plat::recvmsg(fd, &mut mhdr, 0) };
            if ret < 0 {
                let err = io::Error::last_os_error();
                match err.kind() {
                    io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
                        return RecvResult::WouldBlock;
                    }
                    io::ErrorKind::Interrupted => continue,
                    _ => return RecvResult::IoError(err),
                }
            }
            break ret;
        };

        if plat::ctrl_truncated(&mhdr) {
            return RecvResult::CtrlTruncated(io::Error::new(
                io::ErrorKind::InvalidData,
                "ancillary data truncated by kernel (ANCILLARY_BUFFER_SIZE too small)",
            ));
        }

        if n as usize != 32 {
            return RecvResult::ShortRead;
        }

        let (peer_pid, peer_uid) = match plat::peer_pid_after_recv(fd, &mhdr) {
            Some((pid, uid)) => (pid, uid),
            None => {
                return RecvResult::IoError(io::Error::new(
                    io::ErrorKind::InvalidData,
                    "kernel did not attach peer credentials",
                ));
            }
        };

        let my_uid = observer_uid();
        if peer_pid != 0 && peer_uid != my_uid {
            return RecvResult::IoError(io::Error::new(
                io::ErrorKind::PermissionDenied,
                format!(
                    "peer credential UID mismatch: kernel reports uid {peer_uid}, expected uid {my_uid}"
                ),
            ));
        }

        // Resolve the peer's PID-namespace inode (Linux only). Done after the
        // UID check so the strongest signal fires first. Returns `None` on
        // non-Linux or when /proc/<pid>/ns/pid is unreadable — the cross-ns
        // gate downstream short-circuits to "match" in that case.
        let peer_pid_ns_inode = if peer_pid != 0 {
            read_pid_namespace_inode(peer_pid)
        } else {
            None
        };

        RecvResult::Authenticated {
            peer_pid,
            peer_uid,
            peer_pid_ns_inode,
            origin: BeatOrigin::KernelAttested,
            data,
        }
    }

    // --- Socket-mode-only fallback (OpenBSD, AIX, HP-UX, … or test mode) --
    //
    // Platforms without per-datagram kernel credential passing, and any
    // platform when `force-socketmode-fallback` is active. The only defence
    // is `--socket-mode 0600`; any process under the same UID can reach this
    // socket and forge `frame.pid`. Beats are tagged `SocketModeOnly`; the
    // recovery gate refuses to spawn commands for them.
    #[cfg(any(
        feature = "force-socketmode-fallback",
        not(any(
            target_os = "linux",
            target_os = "macos",
            target_os = "freebsd",
            target_os = "dragonfly",
            target_os = "netbsd",
            target_os = "illumos",
            target_os = "solaris",
        ))
    ))]
    {
        extern "C" {
            fn recv(fd: i32, buf: *mut core::ffi::c_void, len: usize, flags: i32) -> isize;
        }

        let mut data = [0u8; 32];
        let n = loop {
            // SAFETY: `recv(2)` writes up to `len` bytes into `data`, a
            // stack-allocated 32-byte array. The buffer outlives the call.
            // Return value: `< 0` is errno, `>= 0` is byte count.
            let ret = unsafe { recv(fd, data.as_mut_ptr() as *mut core::ffi::c_void, 32, 0) };
            if ret < 0 {
                let err = io::Error::last_os_error();
                match err.kind() {
                    io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
                        return RecvResult::WouldBlock;
                    }
                    io::ErrorKind::Interrupted => continue,
                    _ => return RecvResult::IoError(err),
                }
            }
            break ret as isize;
        };

        if n as usize != 32 {
            return RecvResult::ShortRead;
        }

        return RecvResult::Authenticated {
            peer_pid: 0,
            peer_uid: 0,
            peer_pid_ns_inode: None,
            origin: BeatOrigin::SocketModeOnly,
            data,
        };
    }
}