use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
const HISTORY_CAPACITY: usize = 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileOp {
Create,
Modify,
Delete,
Rename,
}
impl FileOp {
#[must_use]
pub fn from_notify_kind(kind: notify::EventKind) -> Self {
match kind {
notify::EventKind::Create(_) => Self::Create,
notify::EventKind::Remove(_) => Self::Delete,
_ => Self::Modify,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum DaemonStatus {
Idle,
Indexing {
entropy: u16,
},
Deferred {
entropy: u16,
},
Escalated {
entropy: u16,
},
Warned {
reason: String,
},
SafetyHalt,
SafetyExit,
}
impl std::fmt::Display for DaemonStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Idle { .. } => write!(f, "idle"),
Self::Indexing { entropy } => write!(f, "indexing (entropy: {entropy})"),
Self::Deferred { entropy } => write!(f, "deferred (entropy: {entropy})"),
Self::Escalated { entropy } => write!(f, "escalated (entropy: {entropy})"),
Self::Warned { reason } => write!(f, "warned: {reason}"),
Self::SafetyHalt => write!(f, "safety halt"),
Self::SafetyExit => write!(f, "safety exit"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileChange {
#[serde(rename = "p")]
pub path: PathBuf,
#[serde(rename = "m")]
pub mtime: u64,
#[serde(rename = "o")]
pub op: FileOp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "t", rename_all = "snake_case")]
pub enum ServerMessage {
Status {
pid: u32,
status: String,
files: usize,
#[serde(skip_serializing_if = "Option::is_none")]
daemon_status: Option<DaemonStatus>,
},
FilesChanged {
batch: Vec<FileChange>,
#[serde(rename = "ts")]
timestamp: u64,
},
QueryResult {
id: u64,
status: String,
files: usize,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
changes_since: Vec<FileChange>,
#[serde(skip_serializing_if = "Option::is_none")]
daemon_status: Option<DaemonStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
last_rebuild_at: Option<u64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "t", rename_all = "snake_case")]
pub enum ClientMessage {
StatusQuery {
#[serde(default)]
id: u64,
},
HistoryQuery {
since: u64,
id: u64,
},
}
#[derive(Debug, thiserror::Error)]
pub enum DaemonSockError {
#[error("daemon socket I/O: {0}")]
Io(#[from] std::io::Error),
#[error("daemon socket JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("daemon socket path resolution failed")]
PathResolution,
}
type Result<T> = std::result::Result<T, DaemonSockError>;
#[must_use]
pub fn socket_path(root: &Path) -> PathBuf {
let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let hash = format!(
"{:016x}",
xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
);
if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
let dir = PathBuf::from(xdg).join("ixd");
return dir.join(format!("{hash}.sock"));
}
if let Ok(home) = std::env::var("HOME") {
let dir = PathBuf::from(home).join(".local/run/ixd");
return dir.join(format!("{hash}.sock"));
}
let uid = unsafe { libc::getuid() };
PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
}
fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(())
}
struct History {
entries: VecDeque<(u64, Vec<FileChange>)>,
}
impl History {
fn new() -> Self {
Self {
entries: VecDeque::with_capacity(HISTORY_CAPACITY),
}
}
fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
if self.entries.len() >= HISTORY_CAPACITY {
self.entries.pop_front();
}
self.entries.push_back((timestamp, changes));
}
fn since(&self, cutoff: u64) -> Vec<FileChange> {
self.entries
.iter()
.filter(|(ts, _)| *ts > cutoff)
.flat_map(|(_, changes)| changes.iter().cloned())
.collect()
}
}
struct Shared {
clients: Vec<ClientConn>,
history: History,
status: String,
daemon_status: Option<DaemonStatus>,
last_rebuild_at: Option<u64>,
files_count: usize,
}
struct ClientConn {
stream: UnixStream,
}
impl ClientConn {
fn send(&mut self, msg: &ServerMessage) -> bool {
let Ok(mut line) = serde_json::to_string(msg) else {
return false;
};
line.push('\n');
self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
}
}
pub struct DaemonServer {
shared: Arc<Mutex<Shared>>,
listener: UnixListener,
socket_path: PathBuf,
accept_handle: Option<std::thread::JoinHandle<()>>,
running: Arc<std::sync::atomic::AtomicBool>,
}
impl DaemonServer {
pub fn new(root: &Path) -> Result<Self> {
let sp = socket_path(root);
ensure_socket_dir(&sp)?;
if sp.exists() || sp.is_symlink() {
let msg = if sp.is_symlink() {
format!("symlink attack detected at {}", sp.display())
} else {
format!("socket file already exists at {}", sp.display())
};
return Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
msg,
)));
}
let listener = UnixListener::bind(&sp)?;
let shared = Arc::new(Mutex::new(Shared {
clients: Vec::new(),
history: History::new(),
status: "idle".to_string(),
daemon_status: Some(DaemonStatus::Idle),
last_rebuild_at: None,
files_count: 0,
}));
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
Ok(Self {
shared,
listener,
socket_path: sp,
accept_handle: None,
running,
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.socket_path
}
pub fn start(&mut self) -> Result<()> {
let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
let shared = Arc::clone(&self.shared);
let running = Arc::clone(&self.running);
let handle = std::thread::Builder::new()
.name("ixd-sock-accept".to_string())
.spawn(move || {
if let Err(e) = listener.set_nonblocking(true) {
tracing::error!("ixd: cannot set nonblocking: {e}");
return;
}
while running.load(std::sync::atomic::Ordering::SeqCst) {
match listener.accept() {
Ok((stream, _)) => {
if let Err(e) = stream.set_nonblocking(false) {
tracing::warn!("ixd: cannot set blocking on client: {e}");
continue;
}
let _ =
stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
let read_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
tracing::warn!("ixd: cannot clone stream: {e}");
continue;
}
};
let shared_clone = Arc::clone(&shared);
let running_clone = Arc::clone(&running);
if let Err(e) = std::thread::Builder::new()
.name("ixd-sock-client".to_string())
.spawn(move || {
client_read_loop(&read_stream, &shared_clone, &running_clone);
})
{
tracing::warn!("ixd: failed to spawn client thread: {e}");
continue;
}
let conn = ClientConn { stream };
if let Ok(mut s) = shared.lock() {
s.clients.push(conn);
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(e) => {
tracing::warn!("ixd: accept error: {e}");
std::thread::sleep(std::time::Duration::from_millis(200));
}
}
}
})
.map_err(DaemonSockError::Io)?;
self.accept_handle = Some(handle);
Ok(())
}
pub fn broadcast(&self, msg: &ServerMessage) {
let Ok(mut s) = self.shared.lock() else {
return;
};
s.clients.retain_mut(|c| c.send(msg));
}
pub fn set_status(&self, daemon_status: &DaemonStatus, files_count: usize) {
if let Ok(mut s) = self.shared.lock() {
s.status = daemon_status.to_string();
s.daemon_status = Some(daemon_status.clone());
s.files_count = files_count;
}
}
pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
let timestamp = now_secs();
if let Ok(mut s) = self.shared.lock() {
s.history.push(timestamp, changes.clone());
s.files_count = files_count;
if matches!(s.daemon_status, Some(DaemonStatus::Idle)) {
s.status = "idle".to_string();
s.daemon_status = Some(DaemonStatus::Idle);
s.last_rebuild_at = Some(timestamp);
}
let msg = ServerMessage::FilesChanged {
batch: changes,
timestamp,
};
s.clients.retain_mut(|c| c.send(&msg));
}
}
}
fn client_read_loop(
stream: &UnixStream,
shared: &Arc<Mutex<Shared>>,
running: &Arc<std::sync::atomic::AtomicBool>,
) {
let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
let mut reader = BufReader::new(stream);
let mut line_buf = String::new();
loop {
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
line_buf.clear();
match reader.read_line(&mut line_buf) {
Ok(0) => break,
Ok(_) => {
let msg: ClientMessage = match serde_json::from_str(&line_buf) {
Ok(m) => m,
Err(e) => {
tracing::debug!("ixd: malformed client message: {e}");
continue;
}
};
let response = match msg {
ClientMessage::StatusQuery { id } => {
let Ok(s) = shared.lock() else {
tracing::warn!("ixd: shared lock poisoned in status query");
continue;
};
ServerMessage::QueryResult {
id,
status: s.status.clone(),
files: s.files_count,
changes_since: Vec::new(),
daemon_status: s.daemon_status.clone(),
last_rebuild_at: s.last_rebuild_at,
}
}
ClientMessage::HistoryQuery { since, id } => {
let Ok(s) = shared.lock() else {
tracing::warn!("ixd: shared lock poisoned in history query");
continue;
};
let changes = s.history.since(since);
ServerMessage::QueryResult {
id,
status: s.status.clone(),
files: s.files_count,
changes_since: changes,
daemon_status: s.daemon_status.clone(),
last_rebuild_at: s.last_rebuild_at,
}
}
};
if let Ok(mut write_stream) = stream.try_clone() {
match serde_json::to_string(&response) {
Ok(mut line) => {
line.push('\n');
if write_stream.write_all(line.as_bytes()).is_err()
|| write_stream.flush().is_err()
{
break;
}
}
Err(e) => {
tracing::warn!("ixd: failed to serialize query response: {e}");
break;
}
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
Err(_) => break,
}
}
}
pub struct DaemonClient {
stream: BufReader<UnixStream>,
}
impl DaemonClient {
pub fn connect(root: &Path) -> Result<Self> {
let sp = socket_path(root);
let stream = UnixStream::connect(&sp)?;
stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
Ok(Self {
stream: BufReader::new(stream),
})
}
pub fn recv(&mut self) -> Result<ServerMessage> {
let mut line = String::new();
let bytes = self.stream.read_line(&mut line).map_err(|e| {
if e.kind() == std::io::ErrorKind::TimedOut {
DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"recv timed out after 5s",
))
} else {
DaemonSockError::Io(e)
}
})?;
if bytes == 0 {
return Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"daemon closed connection",
)));
}
let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid JSON: {e}"),
))
})?;
Ok(msg)
}
pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
let stream = self.stream.get_mut();
let mut line = serde_json::to_string(msg)?;
line.push('\n');
stream.write_all(line.as_bytes())?;
stream.flush()?;
Ok(())
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
impl Drop for DaemonServer {
fn drop(&mut self) {
self.running
.store(false, std::sync::atomic::Ordering::SeqCst);
if let Some(handle) = self.accept_handle.take() {
let _ = handle.join();
}
let _ = std::fs::remove_file(&self.socket_path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn socket_path_deterministic() {
let root = PathBuf::from("/tmp/test-project");
let p1 = socket_path(&root);
let p2 = socket_path(&root);
assert_eq!(p1, p2, "same root must produce same socket path");
}
#[test]
fn socket_path_different_roots() {
let r1 = PathBuf::from("/tmp/project-a");
let r2 = PathBuf::from("/tmp/project-b");
assert_ne!(socket_path(&r1), socket_path(&r2));
}
#[test]
fn socket_path_uses_xdg() {
unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
let p = socket_path(Path::new("/tmp/some-project"));
assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
assert!(p.extension().is_some_and(|e| e == "sock"));
unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
}
#[test]
fn server_message_ndjson_roundtrip() {
let msg = ServerMessage::Status {
pid: 1234,
status: "idle".to_string(),
files: 42,
daemon_status: None,
};
let json = serde_json::to_string(&msg).expect("serialize");
assert!(json.contains("\"t\":\"status\""), "tag field present");
assert!(
!json.contains("daemon_status"),
"daemon_status should be omitted when None"
);
let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
if let ServerMessage::Status {
pid,
status,
files,
daemon_status,
} = back
{
assert_eq!(pid, 1234);
assert_eq!(status, "idle");
assert_eq!(files, 42);
assert_eq!(daemon_status, None);
} else {
panic!("wrong variant after roundtrip");
}
}
#[test]
fn files_changed_roundtrip() {
let msg = ServerMessage::FilesChanged {
batch: vec![FileChange {
path: PathBuf::from("src/main.rs"),
mtime: 1_776_468_629,
op: FileOp::Modify,
}],
timestamp: 1_776_468_629,
};
let json = serde_json::to_string(&msg).expect("serialize");
let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
if let ServerMessage::FilesChanged { batch, timestamp } = back {
assert_eq!(batch.len(), 1);
assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
assert_eq!(timestamp, 1_776_468_629);
} else {
panic!("wrong variant");
}
}
#[test]
fn client_message_roundtrip() {
let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
let json = serde_json::to_string(&msg).expect("serialize");
let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
if let ClientMessage::HistoryQuery { since, id } = back {
assert_eq!(since, 1000);
assert_eq!(id, 7);
} else {
panic!("wrong variant");
}
}
#[test]
fn history_since() {
let mut h = History::new();
h.push(
100,
vec![FileChange {
path: PathBuf::from("a.rs"),
mtime: 100,
op: FileOp::Create,
}],
);
h.push(
200,
vec![FileChange {
path: PathBuf::from("b.rs"),
mtime: 200,
op: FileOp::Modify,
}],
);
h.push(
300,
vec![FileChange {
path: PathBuf::from("c.rs"),
mtime: 300,
op: FileOp::Delete,
}],
);
let changes = h.since(150);
assert_eq!(changes.len(), 2);
assert_eq!(changes[0].path, PathBuf::from("b.rs"));
assert_eq!(changes[1].path, PathBuf::from("c.rs"));
}
#[test]
fn history_capacity() {
let mut h = History::new();
for i in 0..=HISTORY_CAPACITY {
h.push(
i as u64,
vec![FileChange {
path: PathBuf::from(format!("f{i}")),
mtime: i as u64,
op: FileOp::Modify,
}],
);
}
assert_eq!(h.entries.len(), HISTORY_CAPACITY);
assert_eq!(h.entries.front().expect("non-empty").0, 1);
}
#[test]
fn server_client_connect_and_broadcast() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let mut server = DaemonServer::new(&root).expect("create server");
let sp = server.path().to_path_buf();
let _ = server.start();
let stream = UnixStream::connect(&sp).expect("connect");
let mut client = DaemonClient {
stream: BufReader::new(stream),
};
std::thread::sleep(std::time::Duration::from_millis(200));
server.set_status(&DaemonStatus::Idle, 10);
server.broadcast(&ServerMessage::Status {
pid: 1234,
status: "idle".to_string(),
files: 10,
daemon_status: Some(DaemonStatus::Idle),
});
client
.stream
.get_mut()
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.expect("set timeout");
match client.recv() {
Ok(ServerMessage::Status {
pid,
status,
files,
daemon_status,
}) => {
assert_eq!(pid, 1234);
assert_eq!(status, "idle");
assert_eq!(files, 10);
assert!(daemon_status.is_some());
}
Ok(other) => panic!("expected Status, got {other:?}"),
Err(e) => panic!("recv failed: {e}"),
}
}
#[test]
fn client_query_status() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let mut server = DaemonServer::new(&root).expect("create server");
let sp = server.path().to_path_buf();
let _ = server.start();
server.set_status(&DaemonStatus::Indexing { entropy: 42 }, 99);
let stream = UnixStream::connect(&sp).expect("connect");
let mut client = DaemonClient {
stream: BufReader::new(stream),
};
std::thread::sleep(std::time::Duration::from_millis(200));
client
.send(&ClientMessage::StatusQuery { id: 123 })
.expect("send query");
client
.stream
.get_mut()
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.expect("set timeout");
match client.recv() {
Ok(ServerMessage::QueryResult {
id,
status,
files,
changes_since,
daemon_status,
last_rebuild_at,
}) => {
eprintln!(
"[JSON] id={}, status={}, files={}, daemon_status={:?}, last_rebuild_at={:?}",
id, status, files, daemon_status, last_rebuild_at
);
assert_eq!(id, 123);
assert_eq!(status, "indexing (entropy: 42)");
assert_eq!(files, 99);
assert!(changes_since.is_empty());
assert_eq!(daemon_status, Some(DaemonStatus::Indexing { entropy: 42 }));
assert_eq!(last_rebuild_at, None);
}
Ok(other) => panic!("expected QueryResult, got {other:?}"),
Err(e) => panic!("recv failed: {e}"),
}
}
}