use std::path::{Path, PathBuf};
use crate::vortix_core::ipc::{
decode_frame, encode_frame, FrameError, IpcError, IpcOp, IpcRequest, IpcResponse, IpcResult,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
pub struct DaemonServer {
socket_path: PathBuf,
listener: UnixListener,
}
impl DaemonServer {
pub fn bind(socket_path: PathBuf) -> std::io::Result<Self> {
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&socket_path)?.permissions();
perms.set_mode(0o600);
std::fs::set_permissions(&socket_path, perms)?;
}
Ok(Self {
socket_path,
listener,
})
}
#[must_use]
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
pub async fn run(self) -> std::io::Result<()> {
eprintln!("vortix daemon: listening on {}", self.socket_path.display());
loop {
match self.listener.accept().await {
Ok((stream, _addr)) => {
if let Err(e) = handle_client(stream).await {
eprintln!("vortix daemon: client session ended: {e}");
}
}
Err(e) => {
eprintln!("vortix daemon: accept failed: {e}");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
impl Drop for DaemonServer {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.socket_path);
}
}
async fn handle_client(mut stream: UnixStream) -> Result<(), DaemonError> {
let mut buf = Vec::with_capacity(4096);
let mut read_pos = 0usize;
loop {
let mut chunk = [0u8; 4096];
let n = stream.read(&mut chunk).await?;
if n == 0 {
return Ok(());
}
buf.extend_from_slice(&chunk[..n]);
loop {
match decode_frame::<IpcRequest>(&buf[read_pos..]) {
Ok(None) => break, Ok(Some((req, consumed))) => {
read_pos += consumed;
let resp = dispatch(req).await;
let frame = encode_frame(&resp).map_err(DaemonError::Frame)?;
stream.write_all(&frame).await?;
}
Err(e) => return Err(DaemonError::Frame(e)),
}
}
if read_pos > 0 && read_pos >= buf.len() / 2 {
buf.drain(..read_pos);
read_pos = 0;
}
}
}
#[allow(clippy::unused_async)] async fn dispatch(req: IpcRequest) -> IpcResponse {
let result = match req.op {
IpcOp::Execute(_) | IpcOp::Snapshot | IpcOp::Subscribe => Err(IpcError::Internal(
"engine wiring not yet connected in daemon — coming in v0.3.x".into(),
)),
IpcOp::Shutdown => Ok(IpcResult::ShuttingDown),
};
IpcResponse { id: req.id, result }
}
#[derive(Debug)]
pub enum DaemonError {
Io(std::io::Error),
Frame(FrameError),
}
impl std::fmt::Display for DaemonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "IO error on client session: {e}"),
Self::Frame(e) => write!(f, "frame protocol error: {e}"),
}
}
}
impl std::error::Error for DaemonError {}
impl From<std::io::Error> for DaemonError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}