1use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50const HISTORY_CAPACITY: usize = 1024;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57 Create,
59 Modify,
61 Delete,
63 Rename,
65}
66
67impl FileOp {
68 #[must_use]
70 pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71 match kind {
72 notify::EventKind::Create(_) => Self::Create,
73 notify::EventKind::Remove(_) => Self::Delete,
74 _ => Self::Modify,
75 }
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(tag = "state", rename_all = "snake_case")]
82pub enum DaemonStatus {
83 Idle,
85 Indexing {
87 entropy: u16,
89 },
90 Deferred {
92 entropy: u16,
94 },
95 Escalated {
97 entropy: u16,
99 },
100 Warned {
102 reason: String,
104 },
105 SafetyHalt,
107 SafetyExit,
109}
110
111impl std::fmt::Display for DaemonStatus {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 Self::Idle { .. } => write!(f, "idle"),
115 Self::Indexing { entropy } => write!(f, "indexing (entropy: {entropy})"),
116 Self::Deferred { entropy } => write!(f, "deferred (entropy: {entropy})"),
117 Self::Escalated { entropy } => write!(f, "escalated (entropy: {entropy})"),
118 Self::Warned { reason } => write!(f, "warned: {reason}"),
119 Self::SafetyHalt => write!(f, "safety halt"),
120 Self::SafetyExit => write!(f, "safety exit"),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FileChange {
128 #[serde(rename = "p")]
130 pub path: PathBuf,
131 #[serde(rename = "m")]
133 pub mtime: u64,
134 #[serde(rename = "o")]
136 pub op: FileOp,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "t", rename_all = "snake_case")]
142pub enum ServerMessage {
143 Status {
145 pid: u32,
147 status: String,
149 files: usize,
151 #[serde(skip_serializing_if = "Option::is_none")]
153 daemon_status: Option<DaemonStatus>,
154 },
155 FilesChanged {
157 batch: Vec<FileChange>,
159 #[serde(rename = "ts")]
161 timestamp: u64,
162 },
163 QueryResult {
165 id: u64,
167 status: String,
169 files: usize,
171 #[serde(default, skip_serializing_if = "Vec::is_empty")]
173 changes_since: Vec<FileChange>,
174 #[serde(skip_serializing_if = "Option::is_none")]
176 daemon_status: Option<DaemonStatus>,
177 #[serde(skip_serializing_if = "Option::is_none")]
179 last_rebuild_at: Option<u64>,
180 },
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "t", rename_all = "snake_case")]
186pub enum ClientMessage {
187 StatusQuery {
189 #[serde(default)]
191 id: u64,
192 },
193 HistoryQuery {
195 since: u64,
197 id: u64,
199 },
200}
201
202#[derive(Debug, thiserror::Error)]
204pub enum DaemonSockError {
205 #[error("daemon socket I/O: {0}")]
207 Io(#[from] std::io::Error),
208 #[error("daemon socket JSON: {0}")]
210 Json(#[from] serde_json::Error),
211 #[error("daemon socket path resolution failed")]
213 PathResolution,
214}
215
216type Result<T> = std::result::Result<T, DaemonSockError>;
217
218#[must_use]
227pub fn socket_path(root: &Path) -> PathBuf {
228 let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
229 let hash = format!(
230 "{:016x}",
231 xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
232 );
233
234 if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
235 let dir = PathBuf::from(xdg).join("ixd");
236 return dir.join(format!("{hash}.sock"));
237 }
238
239 if let Ok(home) = std::env::var("HOME") {
240 let dir = PathBuf::from(home).join(".local/run/ixd");
241 return dir.join(format!("{hash}.sock"));
242 }
243
244 let uid = unsafe { libc::getuid() };
245 PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
246}
247
248fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
250 if let Some(parent) = path.parent() {
251 std::fs::create_dir_all(parent)?;
252 }
253 Ok(())
254}
255
256struct History {
258 entries: VecDeque<(u64, Vec<FileChange>)>,
259}
260
261impl History {
262 fn new() -> Self {
263 Self {
264 entries: VecDeque::with_capacity(HISTORY_CAPACITY),
265 }
266 }
267
268 fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
269 if self.entries.len() >= HISTORY_CAPACITY {
270 self.entries.pop_front();
271 }
272 self.entries.push_back((timestamp, changes));
273 }
274
275 fn since(&self, cutoff: u64) -> Vec<FileChange> {
276 self.entries
277 .iter()
278 .filter(|(ts, _)| *ts > cutoff)
279 .flat_map(|(_, changes)| changes.iter().cloned())
280 .collect()
281 }
282}
283
284struct Shared {
286 clients: Vec<ClientConn>,
287 history: History,
288 status: String,
289 daemon_status: Option<DaemonStatus>,
290 last_rebuild_at: Option<u64>,
291 files_count: usize,
292}
293
294struct ClientConn {
295 stream: UnixStream,
296}
297
298impl ClientConn {
299 fn send(&mut self, msg: &ServerMessage) -> bool {
300 let Ok(mut line) = serde_json::to_string(msg) else {
301 return false;
302 };
303 line.push('\n');
304 self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
305 }
306}
307
308pub struct DaemonServer {
313 shared: Arc<Mutex<Shared>>,
314 listener: UnixListener,
315 socket_path: PathBuf,
316 accept_handle: Option<std::thread::JoinHandle<()>>,
317 running: Arc<std::sync::atomic::AtomicBool>,
318}
319
320impl DaemonServer {
321 pub fn new(root: &Path) -> Result<Self> {
331 let sp = socket_path(root);
332 ensure_socket_dir(&sp)?;
333
334 if sp.exists() || sp.is_symlink() {
335 let msg = if sp.is_symlink() {
336 format!("symlink attack detected at {}", sp.display())
337 } else {
338 format!("socket file already exists at {}", sp.display())
339 };
340 return Err(DaemonSockError::Io(std::io::Error::new(
341 std::io::ErrorKind::AddrInUse,
342 msg,
343 )));
344 }
345
346 let listener = UnixListener::bind(&sp)?;
347
348 let shared = Arc::new(Mutex::new(Shared {
349 clients: Vec::new(),
350 history: History::new(),
351 status: "idle".to_string(),
352 daemon_status: Some(DaemonStatus::Idle),
353 last_rebuild_at: None,
354 files_count: 0,
355 }));
356 let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
357
358 Ok(Self {
359 shared,
360 listener,
361 socket_path: sp,
362 accept_handle: None,
363 running,
364 })
365 }
366
367 #[must_use]
369 pub fn path(&self) -> &Path {
370 &self.socket_path
371 }
372
373 pub fn start(&mut self) -> Result<()> {
384 let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
385 let shared = Arc::clone(&self.shared);
386 let running = Arc::clone(&self.running);
387
388 let handle = std::thread::Builder::new()
389 .name("ixd-sock-accept".to_string())
390 .spawn(move || {
391 if let Err(e) = listener.set_nonblocking(true) {
392 tracing::error!("ixd: cannot set nonblocking: {e}");
393 return;
394 }
395
396 while running.load(std::sync::atomic::Ordering::SeqCst) {
397 match listener.accept() {
398 Ok((stream, _)) => {
399 if let Err(e) = stream.set_nonblocking(false) {
400 tracing::warn!("ixd: cannot set blocking on client: {e}");
401 continue;
402 }
403 let _ =
404 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
405 let read_stream = match stream.try_clone() {
406 Ok(s) => s,
407 Err(e) => {
408 tracing::warn!("ixd: cannot clone stream: {e}");
409 continue;
410 }
411 };
412 let shared_clone = Arc::clone(&shared);
413 let running_clone = Arc::clone(&running);
414 if let Err(e) = std::thread::Builder::new()
415 .name("ixd-sock-client".to_string())
416 .spawn(move || {
417 client_read_loop(&read_stream, &shared_clone, &running_clone);
418 })
419 {
420 tracing::warn!("ixd: failed to spawn client thread: {e}");
421 continue;
422 }
423 let conn = ClientConn { stream };
424 if let Ok(mut s) = shared.lock() {
425 s.clients.push(conn);
426 }
427 }
428 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
429 std::thread::sleep(std::time::Duration::from_millis(100));
430 }
431 Err(e) => {
432 tracing::warn!("ixd: accept error: {e}");
433 std::thread::sleep(std::time::Duration::from_millis(200));
434 }
435 }
436 }
437 })
438 .map_err(DaemonSockError::Io)?;
439
440 self.accept_handle = Some(handle);
441 Ok(())
442 }
443
444 pub fn broadcast(&self, msg: &ServerMessage) {
450 let Ok(mut s) = self.shared.lock() else {
451 return;
452 };
453 s.clients.retain_mut(|c| c.send(msg));
454 }
455
456 pub fn set_status(&self, daemon_status: &DaemonStatus, files_count: usize) {
459 if let Ok(mut s) = self.shared.lock() {
460 s.status = daemon_status.to_string();
461 s.daemon_status = Some(daemon_status.clone());
462 s.files_count = files_count;
463 }
464 }
465
466 pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
468 let timestamp = now_secs();
469 if let Ok(mut s) = self.shared.lock() {
470 s.history.push(timestamp, changes.clone());
471 s.files_count = files_count;
472 if matches!(s.daemon_status, Some(DaemonStatus::Idle)) {
473 s.status = "idle".to_string();
474 s.daemon_status = Some(DaemonStatus::Idle);
475 s.last_rebuild_at = Some(timestamp);
476 }
477 let msg = ServerMessage::FilesChanged {
478 batch: changes,
479 timestamp,
480 };
481 s.clients.retain_mut(|c| c.send(&msg));
482 }
483 }
484}
485fn client_read_loop(
486 stream: &UnixStream,
487 shared: &Arc<Mutex<Shared>>,
488 running: &Arc<std::sync::atomic::AtomicBool>,
489) {
490 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
491 let mut reader = BufReader::new(stream);
492 let mut line_buf = String::new();
493
494 loop {
495 if !running.load(std::sync::atomic::Ordering::SeqCst) {
496 break;
497 }
498 line_buf.clear();
499 match reader.read_line(&mut line_buf) {
500 Ok(0) => break,
501 Ok(_) => {
502 let msg: ClientMessage = match serde_json::from_str(&line_buf) {
503 Ok(m) => m,
504 Err(e) => {
505 tracing::debug!("ixd: malformed client message: {e}");
506 continue;
507 }
508 };
509
510 let response = match msg {
511 ClientMessage::StatusQuery { id } => {
512 let Ok(s) = shared.lock() else {
513 tracing::warn!("ixd: shared lock poisoned in status query");
514 continue;
515 };
516 ServerMessage::QueryResult {
517 id,
518 status: s.status.clone(),
519 files: s.files_count,
520 changes_since: Vec::new(),
521 daemon_status: s.daemon_status.clone(),
522 last_rebuild_at: s.last_rebuild_at,
523 }
524 }
525 ClientMessage::HistoryQuery { since, id } => {
526 let Ok(s) = shared.lock() else {
527 tracing::warn!("ixd: shared lock poisoned in history query");
528 continue;
529 };
530 let changes = s.history.since(since);
531 ServerMessage::QueryResult {
532 id,
533 status: s.status.clone(),
534 files: s.files_count,
535 changes_since: changes,
536 daemon_status: s.daemon_status.clone(),
537 last_rebuild_at: s.last_rebuild_at,
538 }
539 }
540 };
541
542if let Ok(mut write_stream) = stream.try_clone() {
543 match serde_json::to_string(&response) {
544 Ok(mut line) => {
545 line.push('\n');
546 if write_stream.write_all(line.as_bytes()).is_err()
547 || write_stream.flush().is_err()
548 {
549 break;
550 }
551 }
552 Err(e) => {
553 tracing::warn!("ixd: failed to serialize query response: {e}");
554 break;
555 }
556 }
557 }
558 }
559 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
560 Err(_) => break,
561 }
562 }
563}
564
565pub struct DaemonClient {
567 stream: BufReader<UnixStream>,
568}
569
570impl DaemonClient {
571 pub fn connect(root: &Path) -> Result<Self> {
577 let sp = socket_path(root);
578 let stream = UnixStream::connect(&sp)?;
579 stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
580 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
581 Ok(Self {
582 stream: BufReader::new(stream),
583 })
584 }
585
586 pub fn recv(&mut self) -> Result<ServerMessage> {
592 let mut line = String::new();
593 let bytes = self.stream.read_line(&mut line).map_err(|e| {
594 if e.kind() == std::io::ErrorKind::TimedOut {
595 DaemonSockError::Io(std::io::Error::new(
596 std::io::ErrorKind::TimedOut,
597 "recv timed out after 5s",
598 ))
599 } else {
600 DaemonSockError::Io(e)
601 }
602 })?;
603 if bytes == 0 {
604 return Err(DaemonSockError::Io(std::io::Error::new(
605 std::io::ErrorKind::UnexpectedEof,
606 "daemon closed connection",
607 )));
608 }
609 let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
610 DaemonSockError::Io(std::io::Error::new(
611 std::io::ErrorKind::InvalidData,
612 format!("invalid JSON: {e}"),
613 ))
614 })?;
615 Ok(msg)
616 }
617
618 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
624 let stream = self.stream.get_mut();
625 let mut line = serde_json::to_string(msg)?;
626 line.push('\n');
627 stream.write_all(line.as_bytes())?;
628 stream.flush()?;
629 Ok(())
630 }
631}
632
633fn now_secs() -> u64 {
635 SystemTime::now()
636 .duration_since(UNIX_EPOCH)
637 .unwrap_or_default()
638 .as_secs()
639}
640
641impl Drop for DaemonServer {
642 fn drop(&mut self) {
643 self.running
644 .store(false, std::sync::atomic::Ordering::SeqCst);
645 if let Some(handle) = self.accept_handle.take() {
646 let _ = handle.join();
647 }
648 let _ = std::fs::remove_file(&self.socket_path);
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use std::path::PathBuf;
656
657 #[test]
658 fn socket_path_deterministic() {
659 let root = PathBuf::from("/tmp/test-project");
660 let p1 = socket_path(&root);
661 let p2 = socket_path(&root);
662 assert_eq!(p1, p2, "same root must produce same socket path");
663 }
664
665 #[test]
666 fn socket_path_different_roots() {
667 let r1 = PathBuf::from("/tmp/project-a");
668 let r2 = PathBuf::from("/tmp/project-b");
669 assert_ne!(socket_path(&r1), socket_path(&r2));
670 }
671
672 #[test]
673 fn socket_path_uses_xdg() {
674 unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
675 let p = socket_path(Path::new("/tmp/some-project"));
676 assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
677 assert!(p.extension().is_some_and(|e| e == "sock"));
678 unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
679 }
680
681 #[test]
682 fn server_message_ndjson_roundtrip() {
683 let msg = ServerMessage::Status {
684 pid: 1234,
685 status: "idle".to_string(),
686 files: 42,
687 daemon_status: None,
688 };
689 let json = serde_json::to_string(&msg).expect("serialize");
690 assert!(json.contains("\"t\":\"status\""), "tag field present");
691 assert!(!json.contains("daemon_status"), "daemon_status should be omitted when None");
692
693 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
694 if let ServerMessage::Status { pid, status, files, daemon_status } = back {
695 assert_eq!(pid, 1234);
696 assert_eq!(status, "idle");
697 assert_eq!(files, 42);
698 assert_eq!(daemon_status, None);
699 } else {
700 panic!("wrong variant after roundtrip");
701 }
702 }
703
704 #[test]
705 fn files_changed_roundtrip() {
706 let msg = ServerMessage::FilesChanged {
707 batch: vec![FileChange {
708 path: PathBuf::from("src/main.rs"),
709 mtime: 1_776_468_629,
710 op: FileOp::Modify,
711 }],
712 timestamp: 1_776_468_629,
713 };
714 let json = serde_json::to_string(&msg).expect("serialize");
715 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
716 if let ServerMessage::FilesChanged { batch, timestamp } = back {
717 assert_eq!(batch.len(), 1);
718 assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
719 assert_eq!(timestamp, 1_776_468_629);
720 } else {
721 panic!("wrong variant");
722 }
723 }
724
725 #[test]
726 fn client_message_roundtrip() {
727 let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
728 let json = serde_json::to_string(&msg).expect("serialize");
729 let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
730 if let ClientMessage::HistoryQuery { since, id } = back {
731 assert_eq!(since, 1000);
732 assert_eq!(id, 7);
733 } else {
734 panic!("wrong variant");
735 }
736 }
737
738 #[test]
739 fn history_since() {
740 let mut h = History::new();
741 h.push(
742 100,
743 vec![FileChange {
744 path: PathBuf::from("a.rs"),
745 mtime: 100,
746 op: FileOp::Create,
747 }],
748 );
749 h.push(
750 200,
751 vec![FileChange {
752 path: PathBuf::from("b.rs"),
753 mtime: 200,
754 op: FileOp::Modify,
755 }],
756 );
757 h.push(
758 300,
759 vec![FileChange {
760 path: PathBuf::from("c.rs"),
761 mtime: 300,
762 op: FileOp::Delete,
763 }],
764 );
765
766 let changes = h.since(150);
767 assert_eq!(changes.len(), 2);
768 assert_eq!(changes[0].path, PathBuf::from("b.rs"));
769 assert_eq!(changes[1].path, PathBuf::from("c.rs"));
770 }
771
772 #[test]
773 fn history_capacity() {
774 let mut h = History::new();
775 for i in 0..=HISTORY_CAPACITY {
776 h.push(
777 i as u64,
778 vec![FileChange {
779 path: PathBuf::from(format!("f{i}")),
780 mtime: i as u64,
781 op: FileOp::Modify,
782 }],
783 );
784 }
785 assert_eq!(h.entries.len(), HISTORY_CAPACITY);
786 assert_eq!(h.entries.front().expect("non-empty").0, 1);
788 }
789
790 #[test]
791 fn server_client_connect_and_broadcast() {
792 let tmp = tempfile::tempdir().expect("tempdir");
793 let root = tmp.path().to_path_buf();
794
795 let mut server = DaemonServer::new(&root).expect("create server");
796 let sp = server.path().to_path_buf();
797 let _ = server.start();
798
799 let stream = UnixStream::connect(&sp).expect("connect");
801 let mut client = DaemonClient {
802 stream: BufReader::new(stream),
803 };
804
805 std::thread::sleep(std::time::Duration::from_millis(200));
807
808 server.set_status(&DaemonStatus::Idle, 10);
809
810 server.broadcast(&ServerMessage::Status {
812 pid: 1234,
813 status: "idle".to_string(),
814 files: 10,
815 daemon_status: Some(DaemonStatus::Idle),
816 });
817
818 client
821 .stream
822 .get_mut()
823 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
824 .expect("set timeout");
825
826 match client.recv() {
827 Ok(ServerMessage::Status { pid, status, files, daemon_status }) => {
828 assert_eq!(pid, 1234);
829 assert_eq!(status, "idle");
830 assert_eq!(files, 10);
831 assert!(daemon_status.is_some());
832 }
833 Ok(other) => panic!("expected Status, got {other:?}"),
834 Err(e) => panic!("recv failed: {e}"),
835 }
836 }
837
838#[test]
839 fn client_query_status() {
840 let tmp = tempfile::tempdir().expect("tempdir");
841 let root = tmp.path().to_path_buf();
842
843 let mut server = DaemonServer::new(&root).expect("create server");
844 let sp = server.path().to_path_buf();
845 let _ = server.start();
846 server.set_status(&DaemonStatus::Indexing { entropy: 42 }, 99);
847
848 let stream = UnixStream::connect(&sp).expect("connect");
849 let mut client = DaemonClient {
850 stream: BufReader::new(stream),
851 };
852
853 std::thread::sleep(std::time::Duration::from_millis(200));
854
855 client
856 .send(&ClientMessage::StatusQuery { id: 123 })
857 .expect("send query");
858
859 client
860 .stream
861 .get_mut()
862 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
863 .expect("set timeout");
864
865 match client.recv() {
866 Ok(ServerMessage::QueryResult {
867 id,
868 status,
869 files,
870 changes_since,
871 daemon_status,
872 last_rebuild_at,
873 }) => {
874 eprintln!("[JSON] id={}, status={}, files={}, daemon_status={:?}, last_rebuild_at={:?}",
875 id, status, files, daemon_status, last_rebuild_at);
876 assert_eq!(id, 123);
877 assert_eq!(status, "indexing (entropy: 42)");
878 assert_eq!(files, 99);
879 assert!(changes_since.is_empty());
880 assert_eq!(daemon_status, Some(DaemonStatus::Indexing { entropy: 42 }));
881 assert_eq!(last_rebuild_at, None);
882 }
883 Ok(other) => panic!("expected QueryResult, got {other:?}"),
884 Err(e) => panic!("recv failed: {e}"),
885 }
886 }
887}