1use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50const HISTORY_CAPACITY: usize = 1024;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57 Create,
59 Modify,
61 Delete,
63 Rename,
65}
66
67impl FileOp {
68 #[must_use]
70 pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71 match kind {
72 notify::EventKind::Create(_) => Self::Create,
73 notify::EventKind::Remove(_) => Self::Delete,
74 _ => Self::Modify,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct FileChange {
82 #[serde(rename = "p")]
84 pub path: PathBuf,
85 #[serde(rename = "m")]
87 pub mtime: u64,
88 #[serde(rename = "o")]
90 pub op: FileOp,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(tag = "t", rename_all = "snake_case")]
96pub enum ServerMessage {
97 Status {
99 pid: u32,
101 status: String,
103 files: usize,
105 },
106 FilesChanged {
108 batch: Vec<FileChange>,
110 #[serde(rename = "ts")]
112 timestamp: u64,
113 },
114 QueryResult {
116 id: u64,
118 status: String,
120 files: usize,
122 #[serde(default, skip_serializing_if = "Vec::is_empty")]
124 changes_since: Vec<FileChange>,
125 },
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130#[serde(tag = "t", rename_all = "snake_case")]
131pub enum ClientMessage {
132 StatusQuery,
134 HistoryQuery {
136 since: u64,
138 id: u64,
140 },
141}
142
143#[derive(Debug, thiserror::Error)]
145pub enum DaemonSockError {
146 #[error("daemon socket I/O: {0}")]
148 Io(#[from] std::io::Error),
149 #[error("daemon socket JSON: {0}")]
151 Json(#[from] serde_json::Error),
152 #[error("daemon socket path resolution failed")]
154 PathResolution,
155}
156
157type Result<T> = std::result::Result<T, DaemonSockError>;
158
159#[must_use]
168pub fn socket_path(root: &Path) -> PathBuf {
169 let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
170 let hash = format!(
171 "{:016x}",
172 xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
173 );
174
175 if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
176 let dir = PathBuf::from(xdg).join("ixd");
177 return dir.join(format!("{hash}.sock"));
178 }
179
180 if let Ok(home) = std::env::var("HOME") {
181 let dir = PathBuf::from(home).join(".local/run/ixd");
182 return dir.join(format!("{hash}.sock"));
183 }
184
185 let uid = unsafe { libc::getuid() };
186 PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
187}
188
189fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
191 if let Some(parent) = path.parent() {
192 std::fs::create_dir_all(parent)?;
193 }
194 Ok(())
195}
196
197struct History {
199 entries: VecDeque<(u64, Vec<FileChange>)>,
200}
201
202impl History {
203 fn new() -> Self {
204 Self {
205 entries: VecDeque::with_capacity(HISTORY_CAPACITY),
206 }
207 }
208
209 fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
210 if self.entries.len() >= HISTORY_CAPACITY {
211 self.entries.pop_front();
212 }
213 self.entries.push_back((timestamp, changes));
214 }
215
216 fn since(&self, cutoff: u64) -> Vec<FileChange> {
217 self.entries
218 .iter()
219 .filter(|(ts, _)| *ts > cutoff)
220 .flat_map(|(_, changes)| changes.iter().cloned())
221 .collect()
222 }
223}
224
225struct Shared {
227 clients: Vec<ClientConn>,
228 history: History,
229 pid: u32,
230 status: String,
231 files_count: usize,
232}
233
234struct ClientConn {
235 stream: UnixStream,
236}
237
238impl ClientConn {
239 fn send(&mut self, msg: &ServerMessage) -> bool {
240 let Ok(mut line) = serde_json::to_string(msg) else {
241 return false;
242 };
243 line.push('\n');
244 self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
245 }
246}
247
248pub struct DaemonServer {
253 shared: Arc<Mutex<Shared>>,
254 listener: UnixListener,
255 socket_path: PathBuf,
256 accept_handle: Option<std::thread::JoinHandle<()>>,
257 running: Arc<std::sync::atomic::AtomicBool>,
258}
259
260impl DaemonServer {
261 pub fn new(root: &Path) -> Result<Self> {
271 let sp = socket_path(root);
272 ensure_socket_dir(&sp)?;
273
274 if sp.exists() || sp.is_symlink() {
275 let msg = if sp.is_symlink() {
276 format!("symlink attack detected at {}", sp.display())
277 } else {
278 format!("socket file already exists at {}", sp.display())
279 };
280 return Err(DaemonSockError::Io(std::io::Error::new(
281 std::io::ErrorKind::AddrInUse,
282 msg,
283 )));
284 }
285
286 let listener = UnixListener::bind(&sp)?;
287
288 let pid = std::process::id();
289 let shared = Arc::new(Mutex::new(Shared {
290 clients: Vec::new(),
291 history: History::new(),
292 pid,
293 status: "idle".to_string(),
294 files_count: 0,
295 }));
296 let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
297
298 Ok(Self {
299 shared,
300 listener,
301 socket_path: sp,
302 accept_handle: None,
303 running,
304 })
305 }
306
307 #[must_use]
309 pub fn path(&self) -> &Path {
310 &self.socket_path
311 }
312
313 pub fn start(&mut self) -> Result<()> {
324 let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
325 let shared = Arc::clone(&self.shared);
326 let running = Arc::clone(&self.running);
327
328 let handle = std::thread::Builder::new()
329 .name("ixd-sock-accept".to_string())
330 .spawn(move || {
331 if let Err(e) = listener.set_nonblocking(true) {
332 tracing::error!("ixd: cannot set nonblocking: {e}");
333 return;
334 }
335
336 while running.load(std::sync::atomic::Ordering::SeqCst) {
337 match listener.accept() {
338 Ok((stream, _)) => {
339 if let Err(e) = stream.set_nonblocking(false) {
340 tracing::warn!("ixd: cannot set blocking on client: {e}");
341 continue;
342 }
343 let _ =
344 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
345 let read_stream = match stream.try_clone() {
346 Ok(s) => s,
347 Err(e) => {
348 tracing::warn!("ixd: cannot clone stream: {e}");
349 continue;
350 }
351 };
352 let shared_clone = Arc::clone(&shared);
353 let running_clone = Arc::clone(&running);
354 if let Err(e) = std::thread::Builder::new()
355 .name("ixd-sock-client".to_string())
356 .spawn(move || {
357 client_read_loop(&read_stream, &shared_clone, &running_clone);
358 })
359 {
360 tracing::warn!("ixd: failed to spawn client thread: {e}");
361 continue;
362 }
363 let conn = ClientConn { stream };
364 if let Ok(mut s) = shared.lock() {
365 s.clients.push(conn);
366 }
367 }
368 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
369 std::thread::sleep(std::time::Duration::from_millis(100));
370 }
371 Err(e) => {
372 tracing::warn!("ixd: accept error: {e}");
373 std::thread::sleep(std::time::Duration::from_millis(200));
374 }
375 }
376 }
377 })
378 .map_err(DaemonSockError::Io)?;
379
380 self.accept_handle = Some(handle);
381 Ok(())
382 }
383
384 pub fn broadcast(&self, msg: &ServerMessage) {
390 let Ok(mut s) = self.shared.lock() else {
391 return;
392 };
393 s.clients.retain_mut(|c| c.send(msg));
394 }
395
396 pub fn set_status(&self, status: &str, files_count: usize) {
399 if let Ok(mut s) = self.shared.lock() {
400 s.status = status.to_string();
401 s.files_count = files_count;
402 }
403 }
404
405 pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
407 let timestamp = now_secs();
408 if let Ok(mut s) = self.shared.lock() {
409 s.history.push(timestamp, changes.clone());
410 s.files_count = files_count;
411 let msg = ServerMessage::FilesChanged {
412 batch: changes,
413 timestamp,
414 };
415 s.clients.retain_mut(|c| c.send(&msg));
416 }
417 }
418}
419fn client_read_loop(
420 stream: &UnixStream,
421 shared: &Arc<Mutex<Shared>>,
422 running: &Arc<std::sync::atomic::AtomicBool>,
423) {
424 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
425 let mut reader = BufReader::new(stream);
426 let mut line_buf = String::new();
427
428 loop {
429 if !running.load(std::sync::atomic::Ordering::SeqCst) {
430 break;
431 }
432 line_buf.clear();
433 match reader.read_line(&mut line_buf) {
434 Ok(0) => break,
435 Ok(_) => {
436 let msg: ClientMessage = match serde_json::from_str(&line_buf) {
437 Ok(m) => m,
438 Err(e) => {
439 tracing::debug!("ixd: malformed client message: {e}");
440 continue;
441 }
442 };
443
444 let response = match msg {
445 ClientMessage::StatusQuery => {
446 let Ok(s) = shared.lock() else {
447 tracing::warn!("ixd: shared lock poisoned in status query");
448 continue;
449 };
450 ServerMessage::QueryResult {
451 id: u64::from(s.pid),
452 status: s.status.clone(),
453 files: s.files_count,
454 changes_since: Vec::new(),
455 }
456 }
457 ClientMessage::HistoryQuery { since, id } => {
458 let Ok(s) = shared.lock() else {
459 tracing::warn!("ixd: shared lock poisoned in history query");
460 continue;
461 };
462 let changes = s.history.since(since);
463 ServerMessage::QueryResult {
464 id,
465 status: s.status.clone(),
466 files: s.files_count,
467 changes_since: changes,
468 }
469 }
470 };
471
472if let Ok(mut write_stream) = stream.try_clone() {
473 match serde_json::to_string(&response) {
474 Ok(mut line) => {
475 line.push('\n');
476 if write_stream.write_all(line.as_bytes()).is_err()
477 || write_stream.flush().is_err()
478 {
479 break;
480 }
481 }
482 Err(e) => {
483 tracing::warn!("ixd: failed to serialize query response: {e}");
484 break;
485 }
486 }
487 }
488 }
489 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
490 Err(_) => break,
491 }
492 }
493}
494
495pub struct DaemonClient {
497 stream: BufReader<UnixStream>,
498}
499
500impl DaemonClient {
501 pub fn connect(root: &Path) -> Result<Self> {
507 let sp = socket_path(root);
508 let stream = UnixStream::connect(&sp)?;
509 stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
510 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
511 Ok(Self {
512 stream: BufReader::new(stream),
513 })
514 }
515
516 pub fn recv(&mut self) -> Result<ServerMessage> {
522 let mut line = String::new();
523 let bytes = self.stream.read_line(&mut line).map_err(|e| {
524 if e.kind() == std::io::ErrorKind::TimedOut {
525 DaemonSockError::Io(std::io::Error::new(
526 std::io::ErrorKind::TimedOut,
527 "recv timed out after 5s",
528 ))
529 } else {
530 DaemonSockError::Io(e)
531 }
532 })?;
533 if bytes == 0 {
534 return Err(DaemonSockError::Io(std::io::Error::new(
535 std::io::ErrorKind::UnexpectedEof,
536 "daemon closed connection",
537 )));
538 }
539 let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
540 DaemonSockError::Io(std::io::Error::new(
541 std::io::ErrorKind::InvalidData,
542 format!("invalid JSON: {e}"),
543 ))
544 })?;
545 Ok(msg)
546 }
547
548 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
554 let stream = self.stream.get_mut();
555 let mut line = serde_json::to_string(msg)?;
556 line.push('\n');
557 stream.write_all(line.as_bytes())?;
558 stream.flush()?;
559 Ok(())
560 }
561}
562
563fn now_secs() -> u64 {
565 SystemTime::now()
566 .duration_since(UNIX_EPOCH)
567 .unwrap_or_default()
568 .as_secs()
569}
570
571impl Drop for DaemonServer {
572 fn drop(&mut self) {
573 self.running
574 .store(false, std::sync::atomic::Ordering::SeqCst);
575 if let Some(handle) = self.accept_handle.take() {
576 let _ = handle.join();
577 }
578 let _ = std::fs::remove_file(&self.socket_path);
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use std::path::PathBuf;
586
587 #[test]
588 fn socket_path_deterministic() {
589 let root = PathBuf::from("/tmp/test-project");
590 let p1 = socket_path(&root);
591 let p2 = socket_path(&root);
592 assert_eq!(p1, p2, "same root must produce same socket path");
593 }
594
595 #[test]
596 fn socket_path_different_roots() {
597 let r1 = PathBuf::from("/tmp/project-a");
598 let r2 = PathBuf::from("/tmp/project-b");
599 assert_ne!(socket_path(&r1), socket_path(&r2));
600 }
601
602 #[test]
603 fn socket_path_uses_xdg() {
604 unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
605 let p = socket_path(Path::new("/tmp/some-project"));
606 assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
607 assert!(p.extension().is_some_and(|e| e == "sock"));
608 unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
609 }
610
611 #[test]
612 fn server_message_ndjson_roundtrip() {
613 let msg = ServerMessage::Status {
614 pid: 1234,
615 status: "idle".to_string(),
616 files: 42,
617 };
618 let json = serde_json::to_string(&msg).expect("serialize");
619 assert!(json.contains("\"t\":\"status\""), "tag field present");
620
621 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
622 if let ServerMessage::Status { pid, status, files } = back {
623 assert_eq!(pid, 1234);
624 assert_eq!(status, "idle");
625 assert_eq!(files, 42);
626 } else {
627 panic!("wrong variant after roundtrip");
628 }
629 }
630
631 #[test]
632 fn files_changed_roundtrip() {
633 let msg = ServerMessage::FilesChanged {
634 batch: vec![FileChange {
635 path: PathBuf::from("src/main.rs"),
636 mtime: 1_776_468_629,
637 op: FileOp::Modify,
638 }],
639 timestamp: 1_776_468_629,
640 };
641 let json = serde_json::to_string(&msg).expect("serialize");
642 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
643 if let ServerMessage::FilesChanged { batch, timestamp } = back {
644 assert_eq!(batch.len(), 1);
645 assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
646 assert_eq!(timestamp, 1_776_468_629);
647 } else {
648 panic!("wrong variant");
649 }
650 }
651
652 #[test]
653 fn client_message_roundtrip() {
654 let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
655 let json = serde_json::to_string(&msg).expect("serialize");
656 let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
657 if let ClientMessage::HistoryQuery { since, id } = back {
658 assert_eq!(since, 1000);
659 assert_eq!(id, 7);
660 } else {
661 panic!("wrong variant");
662 }
663 }
664
665 #[test]
666 fn history_since() {
667 let mut h = History::new();
668 h.push(
669 100,
670 vec![FileChange {
671 path: PathBuf::from("a.rs"),
672 mtime: 100,
673 op: FileOp::Create,
674 }],
675 );
676 h.push(
677 200,
678 vec![FileChange {
679 path: PathBuf::from("b.rs"),
680 mtime: 200,
681 op: FileOp::Modify,
682 }],
683 );
684 h.push(
685 300,
686 vec![FileChange {
687 path: PathBuf::from("c.rs"),
688 mtime: 300,
689 op: FileOp::Delete,
690 }],
691 );
692
693 let changes = h.since(150);
694 assert_eq!(changes.len(), 2);
695 assert_eq!(changes[0].path, PathBuf::from("b.rs"));
696 assert_eq!(changes[1].path, PathBuf::from("c.rs"));
697 }
698
699 #[test]
700 fn history_capacity() {
701 let mut h = History::new();
702 for i in 0..=HISTORY_CAPACITY {
703 h.push(
704 i as u64,
705 vec![FileChange {
706 path: PathBuf::from(format!("f{i}")),
707 mtime: i as u64,
708 op: FileOp::Modify,
709 }],
710 );
711 }
712 assert_eq!(h.entries.len(), HISTORY_CAPACITY);
713 assert_eq!(h.entries.front().expect("non-empty").0, 1);
715 }
716
717 #[test]
718 fn server_client_connect_and_broadcast() {
719 let tmp = tempfile::tempdir().expect("tempdir");
720 let root = tmp.path().to_path_buf();
721
722 let mut server = DaemonServer::new(&root).expect("create server");
723 let sp = server.path().to_path_buf();
724 let _ = server.start();
725
726 let stream = UnixStream::connect(&sp).expect("connect");
728 let mut client = DaemonClient {
729 stream: BufReader::new(stream),
730 };
731
732 std::thread::sleep(std::time::Duration::from_millis(200));
734
735 server.set_status("idle", 10);
736
737 server.broadcast(&ServerMessage::Status {
739 pid: 1234,
740 status: "idle".to_string(),
741 files: 10,
742 });
743
744 client
747 .stream
748 .get_mut()
749 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
750 .expect("set timeout");
751
752 match client.recv() {
753 Ok(ServerMessage::Status { pid, status, files }) => {
754 assert_eq!(pid, 1234);
755 assert_eq!(status, "idle");
756 assert_eq!(files, 10);
757 }
758 Ok(other) => panic!("expected Status, got {other:?}"),
759 Err(e) => panic!("recv failed: {e}"),
760 }
761 }
762
763 #[test]
764 fn client_query_status() {
765 let tmp = tempfile::tempdir().expect("tempdir");
766 let root = tmp.path().to_path_buf();
767
768 let mut server = DaemonServer::new(&root).expect("create server");
769 let sp = server.path().to_path_buf();
770 let _ = server.start();
771 server.set_status("indexing", 99);
772
773 let stream = UnixStream::connect(&sp).expect("connect");
774 let mut client = DaemonClient {
775 stream: BufReader::new(stream),
776 };
777
778 std::thread::sleep(std::time::Duration::from_millis(200));
779
780 client
781 .send(&ClientMessage::StatusQuery)
782 .expect("send query");
783
784 client
785 .stream
786 .get_mut()
787 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
788 .expect("set timeout");
789
790 match client.recv() {
791 Ok(ServerMessage::QueryResult {
792 id: _,
793 status,
794 files,
795 changes_since,
796 }) => {
797 assert_eq!(status, "indexing");
798 assert_eq!(files, 99);
799 assert!(changes_since.is_empty());
800 }
801 Ok(other) => panic!("expected QueryResult, got {other:?}"),
802 Err(e) => panic!("recv failed: {e}"),
803 }
804 }
805}