use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
const HISTORY_CAPACITY: usize = 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FileOp {
Create,
Modify,
Delete,
Rename,
}
impl FileOp {
#[must_use]
pub fn from_notify_kind(kind: notify::EventKind) -> Self {
match kind {
notify::EventKind::Create(_) => Self::Create,
notify::EventKind::Remove(_) => Self::Delete,
_ => Self::Modify,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileChange {
#[serde(rename = "p")]
pub path: PathBuf,
#[serde(rename = "m")]
pub mtime: u64,
#[serde(rename = "o")]
pub op: FileOp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "t", rename_all = "snake_case")]
pub enum ServerMessage {
Status {
pid: u32,
status: String,
files: usize,
},
FilesChanged {
batch: Vec<FileChange>,
#[serde(rename = "ts")]
timestamp: u64,
},
QueryResult {
id: u64,
status: String,
files: usize,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
changes_since: Vec<FileChange>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "t", rename_all = "snake_case")]
pub enum ClientMessage {
StatusQuery,
HistoryQuery {
since: u64,
id: u64,
},
}
#[derive(Debug, thiserror::Error)]
pub enum DaemonSockError {
#[error("daemon socket I/O: {0}")]
Io(#[from] std::io::Error),
#[error("daemon socket JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("daemon socket path resolution failed")]
PathResolution,
}
type Result<T> = std::result::Result<T, DaemonSockError>;
#[must_use]
pub fn socket_path(root: &Path) -> PathBuf {
let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let hash = format!(
"{:016x}",
xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
);
if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
let dir = PathBuf::from(xdg).join("ixd");
return dir.join(format!("{hash}.sock"));
}
if let Ok(home) = std::env::var("HOME") {
let dir = PathBuf::from(home).join(".local/run/ixd");
return dir.join(format!("{hash}.sock"));
}
let uid = unsafe { libc::getuid() };
PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
}
fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(())
}
struct History {
entries: VecDeque<(u64, Vec<FileChange>)>,
}
impl History {
fn new() -> Self {
Self {
entries: VecDeque::with_capacity(HISTORY_CAPACITY),
}
}
fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
if self.entries.len() >= HISTORY_CAPACITY {
self.entries.pop_front();
}
self.entries.push_back((timestamp, changes));
}
fn since(&self, cutoff: u64) -> Vec<FileChange> {
self.entries
.iter()
.filter(|(ts, _)| *ts > cutoff)
.flat_map(|(_, changes)| changes.iter().cloned())
.collect()
}
}
struct Shared {
clients: Vec<ClientConn>,
history: History,
pid: u32,
status: String,
files_count: usize,
}
struct ClientConn {
stream: UnixStream,
}
impl ClientConn {
fn send(&mut self, msg: &ServerMessage) -> bool {
let Ok(mut line) = serde_json::to_string(msg) else {
return false;
};
line.push('\n');
self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
}
}
pub struct DaemonServer {
shared: Arc<Mutex<Shared>>,
listener: UnixListener,
socket_path: PathBuf,
accept_handle: Option<std::thread::JoinHandle<()>>,
running: Arc<std::sync::atomic::AtomicBool>,
}
impl DaemonServer {
pub fn new(root: &Path) -> Result<Self> {
let sp = socket_path(root);
ensure_socket_dir(&sp)?;
let _ = std::fs::remove_file(&sp);
let listener = UnixListener::bind(&sp)?;
let pid = std::process::id();
let shared = Arc::new(Mutex::new(Shared {
clients: Vec::new(),
history: History::new(),
pid,
status: "idle".to_string(),
files_count: 0,
}));
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
Ok(Self {
shared,
listener,
socket_path: sp,
accept_handle: None,
running,
})
}
#[must_use]
pub fn path(&self) -> &Path {
&self.socket_path
}
pub fn start(&mut self) -> Result<()> {
let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
let shared = Arc::clone(&self.shared);
let running = Arc::clone(&self.running);
let handle = std::thread::Builder::new()
.name("ixd-sock-accept".to_string())
.spawn(move || {
if let Err(e) = listener.set_nonblocking(true) {
tracing::error!("ixd: cannot set nonblocking: {e}");
return;
}
while running.load(std::sync::atomic::Ordering::SeqCst) {
match listener.accept() {
Ok((stream, _)) => {
if let Err(e) = stream.set_nonblocking(false) {
tracing::warn!("ixd: cannot set blocking on client: {e}");
continue;
}
let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
let read_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
tracing::warn!("ixd: cannot clone stream: {e}");
continue;
}
};
let shared_clone = Arc::clone(&shared);
let running_clone = Arc::clone(&running);
if let Err(e) = std::thread::Builder::new()
.name("ixd-sock-client".to_string())
.spawn(move || {
client_read_loop(&read_stream, &shared_clone, &running_clone);
})
{
tracing::warn!("ixd: failed to spawn client thread: {e}");
continue;
}
let conn = ClientConn { stream };
if let Ok(mut s) = shared.lock() {
s.clients.push(conn);
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(e) => {
tracing::warn!("ixd: accept error: {e}");
std::thread::sleep(std::time::Duration::from_millis(200));
}
}
}
})
.map_err(DaemonSockError::Io)?;
self.accept_handle = Some(handle);
Ok(())
}
pub fn broadcast(&self, msg: &ServerMessage) {
let Ok(mut s) = self.shared.lock() else {
return;
};
s.clients.retain_mut(|c| c.send(msg));
}
pub fn set_status(&self, status: &str, files_count: usize) {
if let Ok(mut s) = self.shared.lock() {
s.status = status.to_string();
s.files_count = files_count;
}
}
pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
let timestamp = now_secs();
if let Ok(mut s) = self.shared.lock() {
s.history.push(timestamp, changes.clone());
s.files_count = files_count;
}
let msg = ServerMessage::FilesChanged {
batch: changes,
timestamp,
};
self.broadcast(&msg);
}
}
fn client_read_loop(
stream: &UnixStream,
shared: &Arc<Mutex<Shared>>,
running: &Arc<std::sync::atomic::AtomicBool>,
) {
let reader = BufReader::new(stream);
for line in reader.lines() {
if !running.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
let Ok(line) = line else { break };
let msg: ClientMessage = match serde_json::from_str(&line) {
Ok(m) => m,
Err(e) => {
tracing::debug!("ixd: malformed client message: {e}");
continue;
}
};
let response = match msg {
ClientMessage::StatusQuery => {
let Ok(s) = shared.lock() else {
tracing::warn!("ixd: shared lock poisoned in status query");
continue;
};
ServerMessage::QueryResult {
id: u64::from(s.pid),
status: s.status.clone(),
files: s.files_count,
changes_since: Vec::new(),
}
}
ClientMessage::HistoryQuery { since, id } => {
let Ok(s) = shared.lock() else {
tracing::warn!("ixd: shared lock poisoned in history query");
continue;
};
let changes = s.history.since(since);
ServerMessage::QueryResult {
id,
status: s.status.clone(),
files: s.files_count,
changes_since: changes,
}
}
};
if let Ok(mut write_stream) = stream.try_clone() {
let mut line = serde_json::to_string(&response).unwrap_or_default();
line.push('\n');
let _ = write_stream.write_all(line.as_bytes());
let _ = write_stream.flush();
}
}
}
pub struct DaemonClient {
stream: BufReader<UnixStream>,
}
impl DaemonClient {
pub fn connect(root: &Path) -> Result<Self> {
let sp = socket_path(root);
let stream = UnixStream::connect(&sp)?;
Ok(Self {
stream: BufReader::new(stream),
})
}
pub fn recv(&mut self) -> Result<ServerMessage> {
let mut line = String::new();
let bytes = self.stream.read_line(&mut line)?;
if bytes == 0 {
return Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"daemon closed connection",
)));
}
let msg: ServerMessage = serde_json::from_str(line.trim_end())?;
Ok(msg)
}
pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
let stream = self.stream.get_mut();
let mut line = serde_json::to_string(msg)?;
line.push('\n');
stream.write_all(line.as_bytes())?;
stream.flush()?;
Ok(())
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
impl Drop for DaemonServer {
fn drop(&mut self) {
self.running
.store(false, std::sync::atomic::Ordering::SeqCst);
if let Some(handle) = self.accept_handle.take() {
let _ = handle.join();
}
let _ = std::fs::remove_file(&self.socket_path);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn socket_path_deterministic() {
let root = PathBuf::from("/tmp/test-project");
let p1 = socket_path(&root);
let p2 = socket_path(&root);
assert_eq!(p1, p2, "same root must produce same socket path");
}
#[test]
fn socket_path_different_roots() {
let r1 = PathBuf::from("/tmp/project-a");
let r2 = PathBuf::from("/tmp/project-b");
assert_ne!(socket_path(&r1), socket_path(&r2));
}
#[test]
fn socket_path_uses_xdg() {
unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
let p = socket_path(Path::new("/tmp/some-project"));
assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
assert!(p.extension().is_some_and(|e| e == "sock"));
unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
}
#[test]
fn server_message_ndjson_roundtrip() {
let msg = ServerMessage::Status {
pid: 1234,
status: "idle".to_string(),
files: 42,
};
let json = serde_json::to_string(&msg).expect("serialize");
assert!(json.contains("\"t\":\"status\""), "tag field present");
let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
if let ServerMessage::Status { pid, status, files } = back {
assert_eq!(pid, 1234);
assert_eq!(status, "idle");
assert_eq!(files, 42);
} else {
panic!("wrong variant after roundtrip");
}
}
#[test]
fn files_changed_roundtrip() {
let msg = ServerMessage::FilesChanged {
batch: vec![FileChange {
path: PathBuf::from("src/main.rs"),
mtime: 1_776_468_629,
op: FileOp::Modify,
}],
timestamp: 1_776_468_629,
};
let json = serde_json::to_string(&msg).expect("serialize");
let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
if let ServerMessage::FilesChanged { batch, timestamp } = back {
assert_eq!(batch.len(), 1);
assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
assert_eq!(timestamp, 1_776_468_629);
} else {
panic!("wrong variant");
}
}
#[test]
fn client_message_roundtrip() {
let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
let json = serde_json::to_string(&msg).expect("serialize");
let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
if let ClientMessage::HistoryQuery { since, id } = back {
assert_eq!(since, 1000);
assert_eq!(id, 7);
} else {
panic!("wrong variant");
}
}
#[test]
fn history_since() {
let mut h = History::new();
h.push(
100,
vec![FileChange {
path: PathBuf::from("a.rs"),
mtime: 100,
op: FileOp::Create,
}],
);
h.push(
200,
vec![FileChange {
path: PathBuf::from("b.rs"),
mtime: 200,
op: FileOp::Modify,
}],
);
h.push(
300,
vec![FileChange {
path: PathBuf::from("c.rs"),
mtime: 300,
op: FileOp::Delete,
}],
);
let changes = h.since(150);
assert_eq!(changes.len(), 2);
assert_eq!(changes[0].path, PathBuf::from("b.rs"));
assert_eq!(changes[1].path, PathBuf::from("c.rs"));
}
#[test]
fn history_capacity() {
let mut h = History::new();
for i in 0..=HISTORY_CAPACITY {
h.push(
i as u64,
vec![FileChange {
path: PathBuf::from(format!("f{i}")),
mtime: i as u64,
op: FileOp::Modify,
}],
);
}
assert_eq!(h.entries.len(), HISTORY_CAPACITY);
assert_eq!(h.entries.front().expect("non-empty").0, 1);
}
#[test]
fn server_client_connect_and_broadcast() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let mut server = DaemonServer::new(&root).expect("create server");
let sp = server.path().to_path_buf();
server.start();
let stream = UnixStream::connect(&sp).expect("connect");
let mut client = DaemonClient {
stream: BufReader::new(stream),
};
std::thread::sleep(std::time::Duration::from_millis(200));
server.set_status("idle", 10);
server.broadcast(&ServerMessage::Status {
pid: 1234,
status: "idle".to_string(),
files: 10,
});
client
.stream
.get_mut()
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.expect("set timeout");
match client.recv() {
Ok(ServerMessage::Status { pid, status, files }) => {
assert_eq!(pid, 1234);
assert_eq!(status, "idle");
assert_eq!(files, 10);
}
Ok(other) => panic!("expected Status, got {other:?}"),
Err(e) => panic!("recv failed: {e}"),
}
}
#[test]
fn client_query_status() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
let mut server = DaemonServer::new(&root).expect("create server");
let sp = server.path().to_path_buf();
server.start();
server.set_status("indexing", 99);
let stream = UnixStream::connect(&sp).expect("connect");
let mut client = DaemonClient {
stream: BufReader::new(stream),
};
std::thread::sleep(std::time::Duration::from_millis(200));
client
.send(&ClientMessage::StatusQuery)
.expect("send query");
client
.stream
.get_mut()
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.expect("set timeout");
match client.recv() {
Ok(ServerMessage::QueryResult {
id: _,
status,
files,
changes_since,
}) => {
assert_eq!(status, "indexing");
assert_eq!(files, 99);
assert!(changes_since.is_empty());
}
Ok(other) => panic!("expected QueryResult, got {other:?}"),
Err(e) => panic!("recv failed: {e}"),
}
}
}