use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::vortix_core::engine::EngineHandle;
use crate::vortix_core::ipc::{
decode_frame, encode_frame, FrameError, IpcError, IpcOp, IpcRequest, IpcResponse, IpcResult,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
pub struct DaemonServer {
socket_path: PathBuf,
listener: UnixListener,
engine_handle: Option<Arc<EngineHandle>>,
daemon_uid: u32,
}
impl DaemonServer {
pub fn bind(socket_path: PathBuf) -> std::io::Result<Self> {
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&socket_path)?.permissions();
perms.set_mode(0o600);
std::fs::set_permissions(&socket_path, perms)?;
}
#[allow(unsafe_code)]
let daemon_uid = unsafe { libc::geteuid() };
Ok(Self {
socket_path,
listener,
engine_handle: None,
daemon_uid,
})
}
#[must_use]
pub fn with_engine_handle(mut self, handle: EngineHandle) -> Self {
self.engine_handle = Some(Arc::new(handle));
self
}
#[must_use]
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
#[must_use]
pub fn daemon_uid(&self) -> u32 {
self.daemon_uid
}
pub async fn run(self) -> std::io::Result<()> {
eprintln!("vortix daemon: listening on {}", self.socket_path.display());
if self.engine_handle.is_none() {
tracing::warn!(
"daemon started without an engine handle — Execute/Snapshot/Subscribe will return Internal errors"
);
}
let daemon_uid = self.daemon_uid;
loop {
match self.listener.accept().await {
Ok((stream, _addr)) => {
let handle = self.engine_handle.clone();
if let Err(e) = handle_client(stream, daemon_uid, handle).await {
eprintln!("vortix daemon: client session ended: {e}");
}
}
Err(e) => {
eprintln!("vortix daemon: accept failed: {e}");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
impl Drop for DaemonServer {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.socket_path);
}
}
async fn handle_client(
mut stream: UnixStream,
daemon_uid: u32,
engine_handle: Option<Arc<EngineHandle>>,
) -> Result<(), DaemonError> {
match get_peer_uid(&stream) {
Ok(peer_uid) if peer_uid == daemon_uid => { }
Ok(peer_uid) => {
tracing::warn!(peer_uid, daemon_uid, "rejecting client with UID mismatch");
let resp = IpcResponse {
id: 0,
result: Err(IpcError::Unauthorized),
};
if let Ok(frame) = encode_frame(&resp) {
let _ = stream.write_all(&frame).await;
let _ = stream.shutdown().await;
}
return Ok(());
}
Err(e) => {
tracing::warn!(error = %e, "peer-UID lookup failed; closing connection");
let resp = IpcResponse {
id: 0,
result: Err(IpcError::Internal(format!("peer-UID lookup failed: {e}"))),
};
if let Ok(frame) = encode_frame(&resp) {
let _ = stream.write_all(&frame).await;
let _ = stream.shutdown().await;
}
return Ok(());
}
}
let mut buf = Vec::with_capacity(4096);
let mut read_pos = 0usize;
loop {
let mut chunk = [0u8; 4096];
let n = stream.read(&mut chunk).await?;
if n == 0 {
return Ok(());
}
buf.extend_from_slice(&chunk[..n]);
loop {
match decode_frame::<IpcRequest>(&buf[read_pos..]) {
Ok(None) => break, Ok(Some((req, consumed))) => {
read_pos += consumed;
let resp = dispatch(req, engine_handle.as_deref()).await;
let frame = encode_frame(&resp).map_err(DaemonError::Frame)?;
stream.write_all(&frame).await?;
}
Err(e) => return Err(DaemonError::Frame(e)),
}
}
if read_pos > 0 && read_pos >= buf.len() / 2 {
buf.drain(..read_pos);
read_pos = 0;
}
}
}
#[cfg(unix)]
#[allow(unsafe_code)]
fn get_peer_uid(stream: &UnixStream) -> std::io::Result<u32> {
use std::os::unix::io::AsRawFd;
let fd = stream.as_raw_fd();
#[cfg(target_os = "linux")]
{
unsafe {
let mut cred: libc::ucred = std::mem::zeroed();
let mut len = libc::socklen_t::try_from(std::mem::size_of::<libc::ucred>()).expect(
"ucred size fits in socklen_t (a small POD struct on every supported target)",
);
let rc = libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_PEERCRED,
std::ptr::addr_of_mut!(cred).cast::<libc::c_void>(),
std::ptr::from_mut(&mut len),
);
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(cred.uid)
}
}
#[cfg(target_os = "macos")]
{
unsafe {
let mut uid: libc::uid_t = 0;
let mut gid: libc::gid_t = 0;
let rc = libc::getpeereid(
fd,
std::ptr::from_mut(&mut uid),
std::ptr::from_mut(&mut gid),
);
if rc != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(uid)
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = fd;
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"peer-UID lookup not supported on this unix variant",
))
}
}
async fn dispatch(req: IpcRequest, engine_handle: Option<&EngineHandle>) -> IpcResponse {
let result = match req.op {
IpcOp::Execute(cmd) => match engine_handle {
Some(h) => match h.execute_command(cmd).await {
Ok(_ack) => Ok(IpcResult::Accepted),
Err(e) => Err(IpcError::Internal(format!("engine error: {e}"))),
},
None => Err(IpcError::Internal(
"engine handle not initialized in daemon".into(),
)),
},
IpcOp::Snapshot => match engine_handle {
Some(h) => match h.snapshot().await {
Ok(snap) => Ok(IpcResult::Snapshot { state: snap.state }),
Err(e) => Err(IpcError::Internal(format!("snapshot error: {e}"))),
},
None => Err(IpcError::Internal(
"engine handle not initialized in daemon".into(),
)),
},
IpcOp::Subscribe => {
if engine_handle.is_some() {
tracing::warn!(
"daemon: Subscribe acknowledged but streaming half is not yet implemented — clients should poll Snapshot until the streaming unit lands"
);
Ok(IpcResult::Subscribed)
} else {
Err(IpcError::Internal(
"engine handle not initialized in daemon".into(),
))
}
}
IpcOp::Shutdown => Ok(IpcResult::ShuttingDown),
};
IpcResponse { id: req.id, result }
}
#[derive(Debug)]
pub enum DaemonError {
Io(std::io::Error),
Frame(FrameError),
}
impl std::fmt::Display for DaemonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "IO error on client session: {e}"),
Self::Frame(e) => write!(f, "frame protocol error: {e}"),
}
}
}
impl std::error::Error for DaemonError {}
impl From<std::io::Error> for DaemonError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
#[cfg(all(test, unix))]
#[allow(unsafe_code)]
mod tests {
use super::*;
use crate::vortix_core::engine::input::UserCommand;
use crate::vortix_core::engine::state::Connection;
use crate::vortix_core::profile::ProfileId;
use tokio::net::UnixStream as TokioUnixStream;
#[tokio::test]
async fn dispatch_execute_without_handle_returns_internal_error() {
let req = IpcRequest {
id: 1,
op: IpcOp::Execute(UserCommand::Connect {
profile_id: ProfileId::new("corp"),
}),
};
let resp = dispatch(req, None).await;
assert_eq!(resp.id, 1);
match resp.result {
Err(IpcError::Internal(msg)) => assert!(msg.contains("engine handle not initialized")),
other => panic!("expected Internal error, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_snapshot_without_handle_returns_internal_error() {
let req = IpcRequest {
id: 2,
op: IpcOp::Snapshot,
};
let resp = dispatch(req, None).await;
assert_eq!(resp.id, 2);
assert!(matches!(resp.result, Err(IpcError::Internal(_))));
}
#[tokio::test]
async fn dispatch_subscribe_without_handle_returns_internal_error() {
let req = IpcRequest {
id: 3,
op: IpcOp::Subscribe,
};
let resp = dispatch(req, None).await;
assert_eq!(resp.id, 3);
assert!(matches!(resp.result, Err(IpcError::Internal(_))));
}
#[tokio::test]
async fn dispatch_shutdown_does_not_require_engine_handle() {
let req = IpcRequest {
id: 4,
op: IpcOp::Shutdown,
};
let resp = dispatch(req, None).await;
assert_eq!(resp.id, 4);
assert!(matches!(resp.result, Ok(IpcResult::ShuttingDown)));
}
#[tokio::test]
async fn dispatch_snapshot_with_handle_returns_disconnected_initially() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 5,
op: IpcOp::Snapshot,
};
let resp = dispatch(req, Some(&handle)).await;
match resp.result {
Ok(IpcResult::Snapshot { state }) => {
assert!(matches!(state, Connection::Disconnected { .. }));
}
other => panic!("expected Snapshot, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_execute_connect_with_handle_returns_accepted() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 6,
op: IpcOp::Execute(UserCommand::Connect {
profile_id: ProfileId::new("corp"),
}),
};
let resp = dispatch(req, Some(&handle)).await;
assert!(matches!(resp.result, Ok(IpcResult::Accepted)));
}
#[tokio::test]
async fn dispatch_subscribe_with_handle_returns_subscribed_ack() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 7,
op: IpcOp::Subscribe,
};
let resp = dispatch(req, Some(&handle)).await;
assert!(matches!(resp.result, Ok(IpcResult::Subscribed)));
}
fn fresh_socket_path() -> PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
p.push(format!("vortix-test-{}-{nanos}.sock", std::process::id()));
p
}
#[tokio::test]
async fn peer_uid_matches_daemon_uid_for_same_process() {
let socket = fresh_socket_path();
let server = DaemonServer::bind(socket.clone()).expect("bind");
let daemon_uid = server.daemon_uid();
let process_uid = unsafe { libc::geteuid() };
assert_eq!(daemon_uid, process_uid, "daemon UID captured correctly");
let handle = tokio::spawn(server.run());
let mut client = loop {
match TokioUnixStream::connect(&socket).await {
Ok(s) => break s,
Err(_) => tokio::time::sleep(std::time::Duration::from_millis(10)).await,
}
};
let req = IpcRequest {
id: 7,
op: IpcOp::Shutdown,
};
let frame = encode_frame(&req).expect("encode");
client.write_all(&frame).await.expect("write");
let mut buf = vec![0u8; 4096];
let n = client.read(&mut buf).await.expect("read");
let (resp, _) = decode_frame::<IpcResponse>(&buf[..n])
.expect("decode ok")
.expect("complete frame");
assert_eq!(resp.id, 7);
assert!(matches!(resp.result, Ok(IpcResult::ShuttingDown)));
handle.abort();
let _ = std::fs::remove_file(&socket);
}
#[tokio::test]
async fn unauthorized_path_emits_unauthorized_frame_without_dispatch() {
let (server_end, mut client_end) = TokioUnixStream::pair().expect("socketpair");
let fake_daemon_uid = u32::MAX;
let server_task =
tokio::spawn(async move { handle_client(server_end, fake_daemon_uid, None).await });
let mut buf = vec![0u8; 4096];
let n = client_end.read(&mut buf).await.expect("read");
let (resp, _) = decode_frame::<IpcResponse>(&buf[..n])
.expect("decode ok")
.expect("complete frame");
assert_eq!(resp.id, 0, "unauthorized frame uses id=0");
assert!(matches!(resp.result, Err(IpcError::Unauthorized)));
let outcome = server_task.await.expect("join");
assert!(outcome.is_ok());
}
#[tokio::test]
async fn get_peer_uid_returns_current_process_uid_on_socketpair() {
let (a, _b) = TokioUnixStream::pair().expect("socketpair");
let uid = get_peer_uid(&a).expect("peer uid lookup");
let me = unsafe { libc::geteuid() };
assert_eq!(uid, me);
}
#[tokio::test]
async fn dispatch_execute_disconnect_all_routes_through_engine_handle() {
let handle = EngineHandle::for_test();
let connect_req = IpcRequest {
id: 10,
op: IpcOp::Execute(UserCommand::Connect {
profile_id: ProfileId::new("corp"),
}),
};
let _ = dispatch(connect_req, Some(&handle)).await;
let req = IpcRequest {
id: 11,
op: IpcOp::Execute(UserCommand::Disconnect { profile_id: None }),
};
let resp = dispatch(req, Some(&handle)).await;
assert_eq!(resp.id, 11);
assert!(matches!(resp.result, Ok(IpcResult::Accepted)));
}
#[tokio::test]
async fn dispatch_execute_disconnect_specific_routes_through_engine_handle() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 12,
op: IpcOp::Execute(UserCommand::Disconnect {
profile_id: Some(ProfileId::new("corp")),
}),
};
let resp = dispatch(req, Some(&handle)).await;
assert!(matches!(resp.result, Ok(IpcResult::Accepted)));
}
#[tokio::test]
async fn dispatch_execute_reconnect_all_routes_through_engine_handle() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 13,
op: IpcOp::Execute(UserCommand::Reconnect { profile_id: None }),
};
let resp = dispatch(req, Some(&handle)).await;
assert!(matches!(resp.result, Ok(IpcResult::Accepted)));
}
#[tokio::test]
async fn dispatch_execute_force_disconnect_specific_routes_through_engine_handle() {
let handle = EngineHandle::for_test();
let req = IpcRequest {
id: 14,
op: IpcOp::Execute(UserCommand::ForceDisconnect {
profile_id: Some(ProfileId::new("corp")),
}),
};
let resp = dispatch(req, Some(&handle)).await;
assert!(matches!(resp.result, Ok(IpcResult::Accepted)));
}
#[test]
fn v1_disconnect_unit_form_does_not_decode_against_v2_op() {
let v1_envelope = r#"{"kind":"execute","Execute":"Disconnect"}"#;
let parsed: Result<IpcOp, _> = serde_json::from_str(v1_envelope);
assert!(
parsed.is_err(),
"v1 unit-variant Disconnect should be rejected by v2 IpcOp decoder, got {parsed:?}"
);
}
#[test]
fn v2_disconnect_struct_form_round_trips_through_ipc_op() {
let op = IpcOp::Execute(UserCommand::Disconnect { profile_id: None });
let json = serde_json::to_string(&op).expect("serialize");
let back: IpcOp = serde_json::from_str(&json).expect("deserialize");
match back {
IpcOp::Execute(UserCommand::Disconnect { profile_id: None }) => {}
other => panic!("v2 Disconnect{{None}} round-trip mismatch: {other:?}"),
}
}
#[test]
fn ipc_error_conflict_round_trips() {
use crate::vortix_core::engine::registry::Conflict;
let err = IpcError::Conflict {
conflict: Conflict::DefaultRouteTakeover {
current: ProfileId::new("corp"),
new: ProfileId::new("home"),
},
};
let json = serde_json::to_string(&err).expect("serialize");
let back: IpcError = serde_json::from_str(&json).expect("deserialize");
match back {
IpcError::Conflict {
conflict: Conflict::DefaultRouteTakeover { current, new },
} => {
assert_eq!(current.as_str(), "corp");
assert_eq!(new.as_str(), "home");
}
other => panic!("expected Conflict round-trip, got {other:?}"),
}
}
#[test]
fn ipc_result_registry_snapshot_round_trips() {
use crate::vortix_core::state::KillSwitchState;
let r = IpcResult::RegistrySnapshot {
tunnels: vec![],
primary: None,
killswitch: KillSwitchState::Disabled,
};
let json = serde_json::to_string(&r).expect("serialize");
let back: IpcResult = serde_json::from_str(&json).expect("deserialize");
match back {
IpcResult::RegistrySnapshot {
tunnels,
primary,
killswitch,
} => {
assert!(tunnels.is_empty());
assert!(primary.is_none());
assert_eq!(killswitch, KillSwitchState::Disabled);
}
other => panic!("expected RegistrySnapshot, got {other:?}"),
}
}
}