use core::marker::PhantomData;
use std::io::{self, ErrorKind};
use std::os::unix::fs::PermissionsExt;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
static DIR_FSYNC_FAILED: AtomicU64 = AtomicU64::new(0);
pub fn drain_bind_dir_fsync_failures() -> u64 {
DIR_FSYNC_FAILED.swap(0, Ordering::Relaxed)
}
extern "C" {
fn umask(mode: u32) -> u32;
}
extern "C" {
fn setsockopt(
fd: i32,
level: i32,
optname: i32,
optval: *const core::ffi::c_void,
optlen: u32,
) -> i32;
fn getsockopt(
fd: i32,
level: i32,
optname: i32,
optval: *mut core::ffi::c_void,
optlen: *mut u32,
) -> i32;
}
#[cfg(target_os = "linux")]
const SOL_SOCKET: i32 = 1;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
target_os = "solaris",
))]
const SOL_SOCKET: i32 = 0xffff_u32 as i32;
#[cfg(target_os = "linux")]
const SO_RCVBUF: i32 = 8;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
target_os = "solaris",
))]
const SO_RCVBUF: i32 = 0x1002;
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
target_os = "solaris",
)))]
compile_error!(
"varta-watch has no verified SOL_SOCKET / SO_RCVBUF values for this target_os. \
Verify against the platform's <sys/socket.h> and extend the cfg-any lists in \
crates/varta-watch/src/listener.rs."
);
#[derive(Debug)]
pub struct PreThreadAttestation {
_no_send: PhantomData<*const ()>,
}
impl PreThreadAttestation {
pub fn new() -> io::Result<Self> {
Self::probe()?;
Ok(Self {
_no_send: PhantomData,
})
}
pub unsafe fn new_unchecked() -> Self {
Self {
_no_send: PhantomData,
}
}
#[cfg(target_os = "linux")]
fn probe() -> io::Result<()> {
let mut count: usize = 0;
for entry in std::fs::read_dir("/proc/self/task")? {
entry?;
count += 1;
if count > 1 {
return Err(io::Error::new(
io::ErrorKind::Other,
"process is multi-threaded; UdsListener::bind changes \
umask(2) process-wide and would race concurrent file creation",
));
}
}
Ok(())
}
#[cfg(target_os = "macos")]
fn probe() -> io::Result<()> {
extern "C" {
fn pthread_is_threaded_np() -> i32;
}
if unsafe { pthread_is_threaded_np() } != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
"process is multi-threaded; UdsListener::bind changes \
umask(2) process-wide and would race concurrent file creation",
));
}
Ok(())
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn probe() -> io::Result<()> {
Ok(())
}
}
struct UmaskGuard(u32);
impl Drop for UmaskGuard {
fn drop(&mut self) {
unsafe {
umask(self.0);
}
}
}
use crate::peer_cred::{self, RecvResult};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
pub enum TransportTrust {
#[default]
Untrusted,
Operator,
}
pub trait BeatListener: Send + 'static {
fn recv(&mut self) -> RecvResult;
fn drain_decrypt_failures(&mut self) -> u64 {
0
}
fn drain_truncated(&mut self) -> u64 {
0
}
fn drain_sender_state_full(&mut self) -> u64 {
0
}
fn drain_aead_attempts(&mut self) -> u64 {
0
}
}
pub struct UdsListener {
sock: UnixDatagram,
path: PathBuf,
bound_dev: u64,
bound_ino: u64,
truncated_count: u64,
rcvbuf_bytes: u32,
}
impl UdsListener {
pub fn bind(
path: impl AsRef<Path>,
socket_mode: u32,
read_timeout: Duration,
uds_rcvbuf_bytes: u32,
_pre_thread: &PreThreadAttestation,
) -> io::Result<Self> {
let path = path.as_ref();
let owned_path: PathBuf = path.to_path_buf();
let restrict_umask = !socket_mode & 0o777;
let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
let bind_result = UnixDatagram::bind(path);
let sock = match bind_result {
Ok(sock) => sock,
Err(e) if e.kind() == ErrorKind::AddrInUse => {
let PathOccupant::Socket(stale_socket) = path_occupant(path)? else {
return Err(io::Error::new(
ErrorKind::AddrInUse,
format!(
"cannot bind observer socket at {}: path exists and is not a socket",
path.display()
),
));
};
match probe_live(path) {
Ok(true) => {
return Err(io::Error::new(
ErrorKind::AddrInUse,
format!(
"another varta-watch is already running at {}",
path.display()
),
));
}
Ok(false) => {
match path_occupant(path)? {
PathOccupant::Socket(current) if current == stale_socket => {
std::fs::remove_file(path)?;
}
PathOccupant::Missing => {}
PathOccupant::Socket(_) => {
return Err(io::Error::new(
ErrorKind::AddrInUse,
format!(
"observer socket path changed while probing {}; retry bind",
path.display()
),
));
}
PathOccupant::Other => {
return Err(io::Error::new(
ErrorKind::AddrInUse,
format!(
"cannot bind observer socket at {}: path exists and is not a socket",
path.display()
),
));
}
}
let _umask_guard = UmaskGuard(unsafe { umask(restrict_umask) });
let sock = UnixDatagram::bind(path)?;
std::fs::set_permissions(
path,
std::fs::Permissions::from_mode(socket_mode),
)?;
return Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes);
}
Err(e) => {
return Err(io::Error::new(
e.kind(),
format!("cannot probe socket at {}: {e}", path.display()),
));
}
}
}
Err(e) => return Err(e),
};
std::fs::set_permissions(path, std::fs::Permissions::from_mode(socket_mode))?;
Self::finish_bind(sock, owned_path, read_timeout, uds_rcvbuf_bytes)
}
fn finish_bind(
sock: UnixDatagram,
path: PathBuf,
read_timeout: Duration,
uds_rcvbuf_bytes: u32,
) -> io::Result<Self> {
use std::os::unix::fs::MetadataExt;
sock.set_read_timeout(Some(read_timeout))?;
let raw_fd = sock.as_raw_fd();
peer_cred::enable_credential_passing(raw_fd)?;
let meta = std::fs::metadata(&path)?;
let bound_dev = meta.dev();
let bound_ino = meta.ino();
if let Err(e) = fsync_parent_dir(&path) {
crate::varta_warn!(
"uds bind: parent-directory fsync failed (durability degraded): {e}"
);
DIR_FSYNC_FAILED.fetch_add(1, Ordering::Relaxed);
}
let granted_rcvbuf = if uds_rcvbuf_bytes > 0 {
set_rcvbuf(raw_fd, uds_rcvbuf_bytes).unwrap_or(0)
} else {
0
};
Ok(UdsListener {
sock,
path,
bound_dev,
bound_ino,
truncated_count: 0,
rcvbuf_bytes: granted_rcvbuf,
})
}
}
impl BeatListener for UdsListener {
fn recv(&mut self) -> RecvResult {
match peer_cred::recv_authenticated(self.sock.as_raw_fd()) {
RecvResult::ShortRead => {
self.truncated_count = self.truncated_count.wrapping_add(1);
RecvResult::ShortRead
}
other => other,
}
}
fn drain_truncated(&mut self) -> u64 {
let n = self.truncated_count;
self.truncated_count = 0;
n
}
}
impl Drop for UdsListener {
fn drop(&mut self) {
use std::os::unix::fs::MetadataExt;
if let Ok(meta) = std::fs::metadata(&self.path) {
if meta.dev() == self.bound_dev && meta.ino() == self.bound_ino {
let _ = std::fs::remove_file(&self.path);
}
}
}
}
impl UdsListener {
pub fn rcvbuf_bytes(&self) -> u32 {
self.rcvbuf_bytes
}
}
fn fsync_parent_dir(path: &Path) -> io::Result<()> {
let parent = path.parent().unwrap_or_else(|| Path::new("."));
let dir = std::fs::File::open(parent)?;
dir.sync_all()
}
fn set_rcvbuf(fd: i32, bytes: u32) -> io::Result<u32> {
use core::ffi::c_void;
use core::mem;
let val = bytes as i32;
let ret = unsafe {
setsockopt(
fd,
SOL_SOCKET,
SO_RCVBUF,
&val as *const i32 as *const c_void,
mem::size_of::<i32>() as u32,
)
};
if ret != 0 {
return Err(io::Error::last_os_error());
}
let mut granted: i32 = 0;
let mut optlen = mem::size_of::<i32>() as u32;
let ret = unsafe {
getsockopt(
fd,
SOL_SOCKET,
SO_RCVBUF,
&mut granted as *mut i32 as *mut c_void,
&mut optlen,
)
};
if ret != 0 {
return Err(io::Error::last_os_error());
}
Ok(granted.max(0) as u32)
}
#[derive(Clone, Copy, Eq, PartialEq)]
struct SocketIdentity {
dev: u64,
ino: u64,
}
enum PathOccupant {
Missing,
Socket(SocketIdentity),
Other,
}
fn path_occupant(path: &Path) -> io::Result<PathOccupant> {
use std::os::unix::fs::{FileTypeExt, MetadataExt};
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_socket() => Ok(PathOccupant::Socket(SocketIdentity {
dev: meta.dev(),
ino: meta.ino(),
})),
Ok(_) => Ok(PathOccupant::Other),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(PathOccupant::Missing),
Err(e) => Err(e),
}
}
fn probe_live(path: &Path) -> io::Result<bool> {
let sock = UnixDatagram::unbound()?;
if let Err(e) = sock.connect(path) {
return match e.kind() {
ErrorKind::PermissionDenied => Err(e),
_ => Ok(false),
};
}
match sock.send(&[]) {
Ok(_) => Ok(true),
Err(e) if e.kind() == ErrorKind::PermissionDenied => Err(e),
Err(_) => Ok(false),
}
}
#[cfg(feature = "unsafe-plaintext-udp")]
mod udp_impl {
use std::io;
use std::net::{SocketAddr, UdpSocket};
use crate::peer_cred::{BeatOrigin, RecvResult};
use super::BeatListener;
pub struct UdpListener {
sock: UdpSocket,
truncated_count: u64,
recovery_trust: super::TransportTrust,
}
impl UdpListener {
pub fn bind(addr: SocketAddr) -> io::Result<Self> {
let sock = UdpSocket::bind(addr)?;
sock.set_nonblocking(true)?;
Ok(UdpListener {
sock,
truncated_count: 0,
recovery_trust: super::TransportTrust::Untrusted,
})
}
pub fn with_recovery_trust(mut self, trust: super::TransportTrust) -> Self {
self.recovery_trust = trust;
self
}
}
impl BeatListener for UdpListener {
fn recv(&mut self) -> RecvResult {
let mut buf = [0u8; 32];
let origin = match self.recovery_trust {
super::TransportTrust::Operator => BeatOrigin::OperatorAttestedTransport,
super::TransportTrust::Untrusted => BeatOrigin::NetworkUnverified,
};
loop {
match self.sock.recv(&mut buf) {
Ok(32) => {
return RecvResult::Authenticated {
peer_pid: 0,
peer_uid: 0,
peer_pid_ns_inode: None,
origin,
data: buf,
};
}
Ok(_) => {
self.truncated_count = self.truncated_count.wrapping_add(1);
continue;
}
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => {
return RecvResult::WouldBlock;
}
io::ErrorKind::Interrupted => continue,
_ => return RecvResult::IoError(e),
},
}
}
}
fn drain_truncated(&mut self) -> u64 {
let n = self.truncated_count;
self.truncated_count = 0;
n
}
}
}
#[cfg(feature = "unsafe-plaintext-udp")]
pub use udp_impl::UdpListener;