#[cfg(unix)]
mod imp {
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{mpsc, watch};
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
use crate::comms::daemon::{Broker, Session};
use crate::comms::protocol::{CommsOut, CommsRequest};
use crate::comms::transport::{CommsFrontend, CommsLink, MAX_FRAME_BYTES, PeerCred};
const CHANNEL_DEPTH: usize = 256;
const READ_CHUNK: usize = 8 * 1024;
pub struct UdsLink {
stream: UnixStream,
codec: LengthDelimitedCodec,
read_buf: BytesMut,
peer: PeerCred,
}
impl UdsLink {
fn new(stream: UnixStream, peer: PeerCred) -> Self {
let mut codec = LengthDelimitedCodec::new();
codec.set_max_frame_length(MAX_FRAME_BYTES);
Self {
stream,
codec,
read_buf: BytesMut::with_capacity(READ_CHUNK),
peer,
}
}
}
impl CommsLink for UdsLink {
async fn recv(&mut self) -> std::io::Result<Option<CommsRequest>> {
loop {
if let Some(frame) = self.codec.decode(&mut self.read_buf)? {
let req = rmp_serde::from_slice(&frame).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
return Ok(Some(req));
}
let n = self.stream.read_buf(&mut self.read_buf).await?;
if n == 0 {
if self.read_buf.is_empty() {
return Ok(None);
}
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"peer closed mid-frame",
));
}
}
}
async fn send(&mut self, out: CommsOut) -> std::io::Result<()> {
let body = rmp_serde::to_vec_named(&out)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
let mut framed = BytesMut::new();
self.codec.encode(Bytes::from(body), &mut framed)?;
self.stream.write_all(&framed).await?;
self.stream.flush().await
}
fn peer_cred(&self) -> PeerCred {
self.peer
}
}
pub struct UdsFrontend {
listener: UnixListener,
socket_path: PathBuf,
}
impl UdsFrontend {
pub fn from_listener(listener: UnixListener, socket_path: PathBuf) -> Self {
Self {
listener,
socket_path,
}
}
}
impl CommsFrontend for UdsFrontend {
async fn serve(
self: Box<Self>,
broker: Arc<Broker>,
mut shutdown: watch::Receiver<bool>,
) -> std::io::Result<()> {
broker.mark_active().await;
let my_uid = super::daemon_uid();
loop {
tokio::select! {
accepted = self.listener.accept() => {
let (stream, _addr) = match accepted {
Ok(pair) => pair,
Err(e) => {
tracing::warn!(error = %e, "comms: accept failed");
continue;
}
};
let peer = peer_cred_of(&stream);
if let Some(uid) = peer.uid && uid != my_uid {
tracing::warn!(
peer_uid = uid,
daemon_uid = my_uid,
"comms: rejecting cross-user connection"
);
continue;
}
let broker = broker.clone();
tokio::spawn(serve_uds_link(broker, UdsLink::new(stream, peer)));
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
}
}
let _ = std::fs::remove_file(&self.socket_path);
Ok(())
}
}
async fn serve_uds_link(broker: Arc<Broker>, mut link: UdsLink) {
let (link_tx, mut link_rx) = mpsc::channel::<CommsOut>(CHANNEL_DEPTH);
let mut session = Session::default();
loop {
tokio::select! {
inbound = link.recv() => {
match inbound {
Ok(Some(req)) => {
let resp = broker.handle(req, &mut session, &link_tx).await;
if link.send(CommsOut::Response(resp)).await.is_err() {
break;
}
}
Ok(None) | Err(_) => break,
}
}
note = link_rx.recv() => {
match note {
Some(out) => {
if link.send(out).await.is_err() {
break;
}
}
None => break,
}
}
}
}
}
fn peer_cred_of(stream: &UnixStream) -> PeerCred {
super::peer_cred_from_fd(stream.as_raw_fd())
}
}
#[cfg(unix)]
pub use imp::{UdsFrontend, UdsLink};
#[cfg(unix)]
pub fn daemon_uid() -> u32 {
unsafe { getuid() }
}
#[cfg(not(unix))]
pub fn daemon_uid() -> u32 {
0
}
#[cfg(unix)]
unsafe extern "C" {
fn getuid() -> u32;
fn getsockopt(
sockfd: i32,
level: i32,
optname: i32,
optval: *mut core::ffi::c_void,
optlen: *mut u32,
) -> i32;
}
#[cfg(unix)]
pub(crate) fn peer_cred_from_fd(fd: i32) -> crate::comms::transport::PeerCred {
#[cfg(target_os = "linux")]
{
const SOL_SOCKET: i32 = 1;
const SO_PEERCRED: i32 = 17;
#[repr(C)]
#[derive(Default, Clone, Copy)]
struct Ucred {
pid: i32,
uid: u32,
gid: u32,
}
let mut cred = Ucred::default();
let mut len = core::mem::size_of::<Ucred>() as u32;
let rc = unsafe {
getsockopt(
fd,
SOL_SOCKET,
SO_PEERCRED,
(&mut cred as *mut Ucred).cast(),
&mut len,
)
};
if rc == 0 {
return crate::comms::transport::PeerCred {
uid: Some(cred.uid),
pid: Some(cred.pid as u32),
};
}
}
#[cfg(target_os = "macos")]
{
const SOL_LOCAL: i32 = 0;
const LOCAL_PEERCRED: i32 = 0x001;
#[repr(C)]
struct Xucred {
cr_version: u32,
cr_uid: u32,
cr_ngroups: i16,
cr_groups: [u32; 16],
}
let mut cred = Xucred {
cr_version: 0,
cr_uid: u32::MAX,
cr_ngroups: 0,
cr_groups: [0; 16],
};
let mut len = core::mem::size_of::<Xucred>() as u32;
let rc = unsafe {
getsockopt(
fd,
SOL_LOCAL,
LOCAL_PEERCRED,
(&mut cred as *mut Xucred).cast(),
&mut len,
)
};
if rc == 0 {
return crate::comms::transport::PeerCred {
uid: Some(cred.cr_uid),
pid: None,
};
}
}
crate::comms::transport::PeerCred::default()
}