1use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50const HISTORY_CAPACITY: usize = 1024;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57 Create,
59 Modify,
61 Delete,
63 Rename,
65}
66
67impl FileOp {
68 #[must_use]
70 pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71 match kind {
72 notify::EventKind::Create(_) => Self::Create,
73 notify::EventKind::Remove(_) => Self::Delete,
74 _ => Self::Modify,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct FileChange {
82 #[serde(rename = "p")]
84 pub path: PathBuf,
85 #[serde(rename = "m")]
87 pub mtime: u64,
88 #[serde(rename = "o")]
90 pub op: FileOp,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(tag = "t", rename_all = "snake_case")]
96pub enum ServerMessage {
97 Status {
99 pid: u32,
101 status: String,
103 files: usize,
105 },
106 FilesChanged {
108 batch: Vec<FileChange>,
110 #[serde(rename = "ts")]
112 timestamp: u64,
113 },
114 QueryResult {
116 id: u64,
118 status: String,
120 files: usize,
122 #[serde(default, skip_serializing_if = "Vec::is_empty")]
124 changes_since: Vec<FileChange>,
125 },
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130#[serde(tag = "t", rename_all = "snake_case")]
131pub enum ClientMessage {
132 StatusQuery,
134 HistoryQuery {
136 since: u64,
138 id: u64,
140 },
141}
142
143#[derive(Debug, thiserror::Error)]
145pub enum DaemonSockError {
146 #[error("daemon socket I/O: {0}")]
148 Io(#[from] std::io::Error),
149 #[error("daemon socket JSON: {0}")]
151 Json(#[from] serde_json::Error),
152 #[error("daemon socket path resolution failed")]
154 PathResolution,
155}
156
157type Result<T> = std::result::Result<T, DaemonSockError>;
158
159#[must_use]
168pub fn socket_path(root: &Path) -> PathBuf {
169 let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
170 let hash = format!(
171 "{:016x}",
172 xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
173 );
174
175 if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
176 let dir = PathBuf::from(xdg).join("ixd");
177 return dir.join(format!("{hash}.sock"));
178 }
179
180 if let Ok(home) = std::env::var("HOME") {
181 let dir = PathBuf::from(home).join(".local/run/ixd");
182 return dir.join(format!("{hash}.sock"));
183 }
184
185 let uid = unsafe { libc::getuid() };
186 PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
187}
188
189fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
191 if let Some(parent) = path.parent() {
192 std::fs::create_dir_all(parent)?;
193 }
194 Ok(())
195}
196
197struct History {
199 entries: VecDeque<(u64, Vec<FileChange>)>,
200}
201
202impl History {
203 fn new() -> Self {
204 Self {
205 entries: VecDeque::with_capacity(HISTORY_CAPACITY),
206 }
207 }
208
209 fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
210 if self.entries.len() >= HISTORY_CAPACITY {
211 self.entries.pop_front();
212 }
213 self.entries.push_back((timestamp, changes));
214 }
215
216 fn since(&self, cutoff: u64) -> Vec<FileChange> {
217 self.entries
218 .iter()
219 .filter(|(ts, _)| *ts > cutoff)
220 .flat_map(|(_, changes)| changes.iter().cloned())
221 .collect()
222 }
223}
224
225struct Shared {
227 clients: Vec<ClientConn>,
228 history: History,
229 pid: u32,
230 status: String,
231 files_count: usize,
232}
233
234struct ClientConn {
235 stream: UnixStream,
236}
237
238impl ClientConn {
239 fn send(&mut self, msg: &ServerMessage) -> bool {
240 let Ok(mut line) = serde_json::to_string(msg) else {
241 return false;
242 };
243 line.push('\n');
244 self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
245 }
246}
247
248pub struct DaemonServer {
253 shared: Arc<Mutex<Shared>>,
254 listener: UnixListener,
255 socket_path: PathBuf,
256 accept_handle: Option<std::thread::JoinHandle<()>>,
257 running: Arc<std::sync::atomic::AtomicBool>,
258}
259
260impl DaemonServer {
261 pub fn new(root: &Path) -> Result<Self> {
271 let sp = socket_path(root);
272 ensure_socket_dir(&sp)?;
273
274 let _ = std::fs::remove_file(&sp);
276
277 let listener = UnixListener::bind(&sp)?;
278
279 let pid = std::process::id();
280 let shared = Arc::new(Mutex::new(Shared {
281 clients: Vec::new(),
282 history: History::new(),
283 pid,
284 status: "idle".to_string(),
285 files_count: 0,
286 }));
287 let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
288
289 Ok(Self {
290 shared,
291 listener,
292 socket_path: sp,
293 accept_handle: None,
294 running,
295 })
296 }
297
298 #[must_use]
300 pub fn path(&self) -> &Path {
301 &self.socket_path
302 }
303
304 pub fn start(&mut self) -> Result<()> {
315 let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
316 let shared = Arc::clone(&self.shared);
317 let running = Arc::clone(&self.running);
318
319 let handle = std::thread::Builder::new()
320 .name("ixd-sock-accept".to_string())
321 .spawn(move || {
322 if let Err(e) = listener.set_nonblocking(true) {
323 tracing::error!("ixd: cannot set nonblocking: {e}");
324 return;
325 }
326
327 while running.load(std::sync::atomic::Ordering::SeqCst) {
328 match listener.accept() {
329 Ok((stream, _)) => {
330 if let Err(e) = stream.set_nonblocking(false) {
331 tracing::warn!("ixd: cannot set blocking on client: {e}");
332 continue;
333 }
334 let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
335 let read_stream = match stream.try_clone() {
336 Ok(s) => s,
337 Err(e) => {
338 tracing::warn!("ixd: cannot clone stream: {e}");
339 continue;
340 }
341 };
342 let shared_clone = Arc::clone(&shared);
343 let running_clone = Arc::clone(&running);
344 if let Err(e) = std::thread::Builder::new()
345 .name("ixd-sock-client".to_string())
346 .spawn(move || {
347 client_read_loop(&read_stream, &shared_clone, &running_clone);
348 })
349 {
350 tracing::warn!("ixd: failed to spawn client thread: {e}");
351 continue;
352 }
353 let conn = ClientConn { stream };
354 if let Ok(mut s) = shared.lock() {
355 s.clients.push(conn);
356 }
357 }
358 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
359 std::thread::sleep(std::time::Duration::from_millis(100));
360 }
361 Err(e) => {
362 tracing::warn!("ixd: accept error: {e}");
363 std::thread::sleep(std::time::Duration::from_millis(200));
364 }
365 }
366 }
367 })
368 .map_err(DaemonSockError::Io)?;
369
370 self.accept_handle = Some(handle);
371 Ok(())
372 }
373
374 pub fn broadcast(&self, msg: &ServerMessage) {
380 let Ok(mut s) = self.shared.lock() else {
381 return;
382 };
383 s.clients.retain_mut(|c| c.send(msg));
384 }
385
386 pub fn set_status(&self, status: &str, files_count: usize) {
389 if let Ok(mut s) = self.shared.lock() {
390 s.status = status.to_string();
391 s.files_count = files_count;
392 }
393 }
394
395 pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
397 let timestamp = now_secs();
398 if let Ok(mut s) = self.shared.lock() {
399 s.history.push(timestamp, changes.clone());
400 s.files_count = files_count;
401 }
402 let msg = ServerMessage::FilesChanged {
403 batch: changes,
404 timestamp,
405 };
406 self.broadcast(&msg);
407 }
408}
409
410fn client_read_loop(
411 stream: &UnixStream,
412 shared: &Arc<Mutex<Shared>>,
413 running: &Arc<std::sync::atomic::AtomicBool>,
414) {
415 let reader = BufReader::new(stream);
416 for line in reader.lines() {
417 if !running.load(std::sync::atomic::Ordering::SeqCst) {
418 break;
419 }
420 let Ok(line) = line else { break };
421
422 let msg: ClientMessage = match serde_json::from_str(&line) {
423 Ok(m) => m,
424 Err(e) => {
425 tracing::debug!("ixd: malformed client message: {e}");
426 continue;
427 }
428 };
429
430 let response = match msg {
431 ClientMessage::StatusQuery => {
432 let Ok(s) = shared.lock() else {
433 tracing::warn!("ixd: shared lock poisoned in status query");
434 continue;
435 };
436 ServerMessage::QueryResult {
437 id: u64::from(s.pid),
438 status: s.status.clone(),
439 files: s.files_count,
440 changes_since: Vec::new(),
441 }
442 }
443 ClientMessage::HistoryQuery { since, id } => {
444 let Ok(s) = shared.lock() else {
445 tracing::warn!("ixd: shared lock poisoned in history query");
446 continue;
447 };
448 let changes = s.history.since(since);
449 ServerMessage::QueryResult {
450 id,
451 status: s.status.clone(),
452 files: s.files_count,
453 changes_since: changes,
454 }
455 }
456 };
457
458 if let Ok(mut write_stream) = stream.try_clone() {
459 let mut line = serde_json::to_string(&response).unwrap_or_default();
460 line.push('\n');
461 let _ = write_stream.write_all(line.as_bytes());
462 let _ = write_stream.flush();
463 }
464 }
465}
466
467pub struct DaemonClient {
469 stream: BufReader<UnixStream>,
470}
471
472impl DaemonClient {
473 pub fn connect(root: &Path) -> Result<Self> {
479 let sp = socket_path(root);
480 let stream = UnixStream::connect(&sp)?;
481 Ok(Self {
482 stream: BufReader::new(stream),
483 })
484 }
485
486 pub fn recv(&mut self) -> Result<ServerMessage> {
492 let mut line = String::new();
493 let bytes = self.stream.read_line(&mut line)?;
494 if bytes == 0 {
495 return Err(DaemonSockError::Io(std::io::Error::new(
496 std::io::ErrorKind::UnexpectedEof,
497 "daemon closed connection",
498 )));
499 }
500 let msg: ServerMessage = serde_json::from_str(line.trim_end())?;
501 Ok(msg)
502 }
503
504 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
510 let stream = self.stream.get_mut();
511 let mut line = serde_json::to_string(msg)?;
512 line.push('\n');
513 stream.write_all(line.as_bytes())?;
514 stream.flush()?;
515 Ok(())
516 }
517}
518
519fn now_secs() -> u64 {
521 SystemTime::now()
522 .duration_since(UNIX_EPOCH)
523 .unwrap_or_default()
524 .as_secs()
525}
526
527impl Drop for DaemonServer {
528 fn drop(&mut self) {
529 self.running
530 .store(false, std::sync::atomic::Ordering::SeqCst);
531 if let Some(handle) = self.accept_handle.take() {
532 let _ = handle.join();
533 }
534 let _ = std::fs::remove_file(&self.socket_path);
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541 use std::path::PathBuf;
542
543 #[test]
544 fn socket_path_deterministic() {
545 let root = PathBuf::from("/tmp/test-project");
546 let p1 = socket_path(&root);
547 let p2 = socket_path(&root);
548 assert_eq!(p1, p2, "same root must produce same socket path");
549 }
550
551 #[test]
552 fn socket_path_different_roots() {
553 let r1 = PathBuf::from("/tmp/project-a");
554 let r2 = PathBuf::from("/tmp/project-b");
555 assert_ne!(socket_path(&r1), socket_path(&r2));
556 }
557
558 #[test]
559 fn socket_path_uses_xdg() {
560 unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
561 let p = socket_path(Path::new("/tmp/some-project"));
562 assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
563 assert!(p.extension().is_some_and(|e| e == "sock"));
564 unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
565 }
566
567 #[test]
568 fn server_message_ndjson_roundtrip() {
569 let msg = ServerMessage::Status {
570 pid: 1234,
571 status: "idle".to_string(),
572 files: 42,
573 };
574 let json = serde_json::to_string(&msg).expect("serialize");
575 assert!(json.contains("\"t\":\"status\""), "tag field present");
576
577 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
578 if let ServerMessage::Status { pid, status, files } = back {
579 assert_eq!(pid, 1234);
580 assert_eq!(status, "idle");
581 assert_eq!(files, 42);
582 } else {
583 panic!("wrong variant after roundtrip");
584 }
585 }
586
587 #[test]
588 fn files_changed_roundtrip() {
589 let msg = ServerMessage::FilesChanged {
590 batch: vec![FileChange {
591 path: PathBuf::from("src/main.rs"),
592 mtime: 1_776_468_629,
593 op: FileOp::Modify,
594 }],
595 timestamp: 1_776_468_629,
596 };
597 let json = serde_json::to_string(&msg).expect("serialize");
598 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
599 if let ServerMessage::FilesChanged { batch, timestamp } = back {
600 assert_eq!(batch.len(), 1);
601 assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
602 assert_eq!(timestamp, 1_776_468_629);
603 } else {
604 panic!("wrong variant");
605 }
606 }
607
608 #[test]
609 fn client_message_roundtrip() {
610 let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
611 let json = serde_json::to_string(&msg).expect("serialize");
612 let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
613 if let ClientMessage::HistoryQuery { since, id } = back {
614 assert_eq!(since, 1000);
615 assert_eq!(id, 7);
616 } else {
617 panic!("wrong variant");
618 }
619 }
620
621 #[test]
622 fn history_since() {
623 let mut h = History::new();
624 h.push(
625 100,
626 vec![FileChange {
627 path: PathBuf::from("a.rs"),
628 mtime: 100,
629 op: FileOp::Create,
630 }],
631 );
632 h.push(
633 200,
634 vec![FileChange {
635 path: PathBuf::from("b.rs"),
636 mtime: 200,
637 op: FileOp::Modify,
638 }],
639 );
640 h.push(
641 300,
642 vec![FileChange {
643 path: PathBuf::from("c.rs"),
644 mtime: 300,
645 op: FileOp::Delete,
646 }],
647 );
648
649 let changes = h.since(150);
650 assert_eq!(changes.len(), 2);
651 assert_eq!(changes[0].path, PathBuf::from("b.rs"));
652 assert_eq!(changes[1].path, PathBuf::from("c.rs"));
653 }
654
655 #[test]
656 fn history_capacity() {
657 let mut h = History::new();
658 for i in 0..=HISTORY_CAPACITY {
659 h.push(
660 i as u64,
661 vec![FileChange {
662 path: PathBuf::from(format!("f{i}")),
663 mtime: i as u64,
664 op: FileOp::Modify,
665 }],
666 );
667 }
668 assert_eq!(h.entries.len(), HISTORY_CAPACITY);
669 assert_eq!(h.entries.front().expect("non-empty").0, 1);
671 }
672
673 #[test]
674 fn server_client_connect_and_broadcast() {
675 let tmp = tempfile::tempdir().expect("tempdir");
676 let root = tmp.path().to_path_buf();
677
678 let mut server = DaemonServer::new(&root).expect("create server");
679 let sp = server.path().to_path_buf();
680 server.start();
681
682 let stream = UnixStream::connect(&sp).expect("connect");
684 let mut client = DaemonClient {
685 stream: BufReader::new(stream),
686 };
687
688 std::thread::sleep(std::time::Duration::from_millis(200));
690
691 server.set_status("idle", 10);
692
693 server.broadcast(&ServerMessage::Status {
695 pid: 1234,
696 status: "idle".to_string(),
697 files: 10,
698 });
699
700 client
703 .stream
704 .get_mut()
705 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
706 .expect("set timeout");
707
708 match client.recv() {
709 Ok(ServerMessage::Status { pid, status, files }) => {
710 assert_eq!(pid, 1234);
711 assert_eq!(status, "idle");
712 assert_eq!(files, 10);
713 }
714 Ok(other) => panic!("expected Status, got {other:?}"),
715 Err(e) => panic!("recv failed: {e}"),
716 }
717 }
718
719 #[test]
720 fn client_query_status() {
721 let tmp = tempfile::tempdir().expect("tempdir");
722 let root = tmp.path().to_path_buf();
723
724 let mut server = DaemonServer::new(&root).expect("create server");
725 let sp = server.path().to_path_buf();
726 server.start();
727 server.set_status("indexing", 99);
728
729 let stream = UnixStream::connect(&sp).expect("connect");
730 let mut client = DaemonClient {
731 stream: BufReader::new(stream),
732 };
733
734 std::thread::sleep(std::time::Duration::from_millis(200));
735
736 client
737 .send(&ClientMessage::StatusQuery)
738 .expect("send query");
739
740 client
741 .stream
742 .get_mut()
743 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
744 .expect("set timeout");
745
746 match client.recv() {
747 Ok(ServerMessage::QueryResult {
748 id: _,
749 status,
750 files,
751 changes_since,
752 }) => {
753 assert_eq!(status, "indexing");
754 assert_eq!(files, 99);
755 assert!(changes_since.is_empty());
756 }
757 Ok(other) => panic!("expected QueryResult, got {other:?}"),
758 Err(e) => panic!("recv failed: {e}"),
759 }
760 }
761}