1use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50const HISTORY_CAPACITY: usize = 1024;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57 Create,
59 Modify,
61 Delete,
63 Rename,
65}
66
67impl FileOp {
68 #[must_use]
70 pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71 match kind {
72 notify::EventKind::Create(_) => Self::Create,
73 notify::EventKind::Remove(_) => Self::Delete,
74 _ => Self::Modify,
75 }
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(tag = "state", rename_all = "snake_case")]
82pub enum DaemonStatus {
83 Idle,
85 Indexing {
87 entropy: u16,
89 },
90 Deferred {
92 entropy: u16,
94 },
95 Escalated {
97 entropy: u16,
99 },
100 Warned {
102 reason: String,
104 },
105 SafetyHalt,
107 SafetyExit,
109}
110
111impl std::fmt::Display for DaemonStatus {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self {
114 Self::Idle { .. } => write!(f, "idle"),
115 Self::Indexing { entropy } => write!(f, "indexing (entropy: {entropy})"),
116 Self::Deferred { entropy } => write!(f, "deferred (entropy: {entropy})"),
117 Self::Escalated { entropy } => write!(f, "escalated (entropy: {entropy})"),
118 Self::Warned { reason } => write!(f, "warned: {reason}"),
119 Self::SafetyHalt => write!(f, "safety halt"),
120 Self::SafetyExit => write!(f, "safety exit"),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FileChange {
128 #[serde(rename = "p")]
130 pub path: PathBuf,
131 #[serde(rename = "m")]
133 pub mtime: u64,
134 #[serde(rename = "o")]
136 pub op: FileOp,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "t", rename_all = "snake_case")]
142pub enum ServerMessage {
143 Status {
145 pid: u32,
147 status: String,
149 files: usize,
151 #[serde(skip_serializing_if = "Option::is_none")]
153 daemon_status: Option<DaemonStatus>,
154 },
155 FilesChanged {
157 batch: Vec<FileChange>,
159 #[serde(rename = "ts")]
161 timestamp: u64,
162 },
163 QueryResult {
165 id: u64,
167 status: String,
169 files: usize,
171 #[serde(default, skip_serializing_if = "Vec::is_empty")]
173 changes_since: Vec<FileChange>,
174 #[serde(skip_serializing_if = "Option::is_none")]
176 daemon_status: Option<DaemonStatus>,
177 #[serde(skip_serializing_if = "Option::is_none")]
179 last_rebuild_at: Option<u64>,
180 },
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "t", rename_all = "snake_case")]
186pub enum ClientMessage {
187 StatusQuery {
189 #[serde(default)]
191 id: u64,
192 },
193 HistoryQuery {
195 since: u64,
197 id: u64,
199 },
200}
201
202#[derive(Debug, thiserror::Error)]
204pub enum DaemonSockError {
205 #[error("daemon socket I/O: {0}")]
207 Io(#[from] std::io::Error),
208 #[error("daemon socket JSON: {0}")]
210 Json(#[from] serde_json::Error),
211 #[error("daemon socket path resolution failed")]
213 PathResolution,
214}
215
216type Result<T> = std::result::Result<T, DaemonSockError>;
217
218#[must_use]
227pub fn socket_path(root: &Path) -> PathBuf {
228 let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
229 let hash = format!(
230 "{:016x}",
231 xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
232 );
233
234 if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
235 let dir = PathBuf::from(xdg).join("ixd");
236 return dir.join(format!("{hash}.sock"));
237 }
238
239 if let Ok(home) = std::env::var("HOME") {
240 let dir = PathBuf::from(home).join(".local/run/ixd");
241 return dir.join(format!("{hash}.sock"));
242 }
243
244 let uid = unsafe { libc::getuid() };
245 PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
246}
247
248fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
250 if let Some(parent) = path.parent() {
251 std::fs::create_dir_all(parent)?;
252 }
253 Ok(())
254}
255
256struct History {
258 entries: VecDeque<(u64, Vec<FileChange>)>,
259}
260
261impl History {
262 fn new() -> Self {
263 Self {
264 entries: VecDeque::with_capacity(HISTORY_CAPACITY),
265 }
266 }
267
268 fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
269 if self.entries.len() >= HISTORY_CAPACITY {
270 self.entries.pop_front();
271 }
272 self.entries.push_back((timestamp, changes));
273 }
274
275 fn since(&self, cutoff: u64) -> Vec<FileChange> {
276 self.entries
277 .iter()
278 .filter(|(ts, _)| *ts > cutoff)
279 .flat_map(|(_, changes)| changes.iter().cloned())
280 .collect()
281 }
282}
283
284struct Shared {
286 clients: Vec<ClientConn>,
287 history: History,
288 status: String,
289 daemon_status: Option<DaemonStatus>,
290 last_rebuild_at: Option<u64>,
291 files_count: usize,
292}
293
294struct ClientConn {
295 stream: UnixStream,
296}
297
298impl ClientConn {
299 fn send(&mut self, msg: &ServerMessage) -> bool {
300 let Ok(mut line) = serde_json::to_string(msg) else {
301 return false;
302 };
303 line.push('\n');
304 self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
305 }
306}
307
308pub struct DaemonServer {
313 shared: Arc<Mutex<Shared>>,
314 listener: UnixListener,
315 socket_path: PathBuf,
316 accept_handle: Option<std::thread::JoinHandle<()>>,
317 running: Arc<std::sync::atomic::AtomicBool>,
318}
319
320impl DaemonServer {
321 pub fn new(root: &Path) -> Result<Self> {
331 let sp = socket_path(root);
332 ensure_socket_dir(&sp)?;
333
334 if sp.exists() || sp.is_symlink() {
335 let msg = if sp.is_symlink() {
336 format!("symlink attack detected at {}", sp.display())
337 } else {
338 format!("socket file already exists at {}", sp.display())
339 };
340 return Err(DaemonSockError::Io(std::io::Error::new(
341 std::io::ErrorKind::AddrInUse,
342 msg,
343 )));
344 }
345
346 let listener = UnixListener::bind(&sp)?;
347
348 let shared = Arc::new(Mutex::new(Shared {
349 clients: Vec::new(),
350 history: History::new(),
351 status: "idle".to_string(),
352 daemon_status: Some(DaemonStatus::Idle),
353 last_rebuild_at: None,
354 files_count: 0,
355 }));
356 let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
357
358 Ok(Self {
359 shared,
360 listener,
361 socket_path: sp,
362 accept_handle: None,
363 running,
364 })
365 }
366
367 #[must_use]
369 pub fn path(&self) -> &Path {
370 &self.socket_path
371 }
372
373 pub fn start(&mut self) -> Result<()> {
384 let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
385 let shared = Arc::clone(&self.shared);
386 let running = Arc::clone(&self.running);
387
388 let handle = std::thread::Builder::new()
389 .name("ixd-sock-accept".to_string())
390 .spawn(move || {
391 if let Err(e) = listener.set_nonblocking(true) {
392 tracing::error!("ixd: cannot set nonblocking: {e}");
393 return;
394 }
395
396 while running.load(std::sync::atomic::Ordering::SeqCst) {
397 match listener.accept() {
398 Ok((stream, _)) => {
399 if let Err(e) = stream.set_nonblocking(false) {
400 tracing::warn!("ixd: cannot set blocking on client: {e}");
401 continue;
402 }
403 let _ =
404 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
405 let read_stream = match stream.try_clone() {
406 Ok(s) => s,
407 Err(e) => {
408 tracing::warn!("ixd: cannot clone stream: {e}");
409 continue;
410 }
411 };
412 let shared_clone = Arc::clone(&shared);
413 let running_clone = Arc::clone(&running);
414 if let Err(e) = std::thread::Builder::new()
415 .name("ixd-sock-client".to_string())
416 .spawn(move || {
417 client_read_loop(&read_stream, &shared_clone, &running_clone);
418 })
419 {
420 tracing::warn!("ixd: failed to spawn client thread: {e}");
421 continue;
422 }
423 let conn = ClientConn { stream };
424 if let Ok(mut s) = shared.lock() {
425 s.clients.push(conn);
426 }
427 }
428 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
429 std::thread::sleep(std::time::Duration::from_millis(100));
430 }
431 Err(e) => {
432 tracing::warn!("ixd: accept error: {e}");
433 std::thread::sleep(std::time::Duration::from_millis(200));
434 }
435 }
436 }
437 })
438 .map_err(DaemonSockError::Io)?;
439
440 self.accept_handle = Some(handle);
441 Ok(())
442 }
443
444 pub fn broadcast(&self, msg: &ServerMessage) {
450 let Ok(mut s) = self.shared.lock() else {
451 return;
452 };
453 s.clients.retain_mut(|c| c.send(msg));
454 }
455
456 pub fn set_status(&self, daemon_status: &DaemonStatus, files_count: usize) {
459 if let Ok(mut s) = self.shared.lock() {
460 s.status = daemon_status.to_string();
461 s.daemon_status = Some(daemon_status.clone());
462 s.files_count = files_count;
463 }
464 }
465
466 pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
468 let timestamp = now_secs();
469 if let Ok(mut s) = self.shared.lock() {
470 s.history.push(timestamp, changes.clone());
471 s.files_count = files_count;
472 if matches!(s.daemon_status, Some(DaemonStatus::Idle)) {
473 s.status = "idle".to_string();
474 s.daemon_status = Some(DaemonStatus::Idle);
475 s.last_rebuild_at = Some(timestamp);
476 }
477 let msg = ServerMessage::FilesChanged {
478 batch: changes,
479 timestamp,
480 };
481 s.clients.retain_mut(|c| c.send(&msg));
482 }
483 }
484}
485fn client_read_loop(
486 stream: &UnixStream,
487 shared: &Arc<Mutex<Shared>>,
488 running: &Arc<std::sync::atomic::AtomicBool>,
489) {
490 let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
491 let mut reader = BufReader::new(stream);
492 let mut line_buf = String::new();
493
494 loop {
495 if !running.load(std::sync::atomic::Ordering::SeqCst) {
496 break;
497 }
498 line_buf.clear();
499 match reader.read_line(&mut line_buf) {
500 Ok(0) => break,
501 Ok(_) => {
502 let msg: ClientMessage = match serde_json::from_str(&line_buf) {
503 Ok(m) => m,
504 Err(e) => {
505 tracing::debug!("ixd: malformed client message: {e}");
506 continue;
507 }
508 };
509
510 let response = match msg {
511 ClientMessage::StatusQuery { id } => {
512 let Ok(s) = shared.lock() else {
513 tracing::warn!("ixd: shared lock poisoned in status query");
514 continue;
515 };
516 ServerMessage::QueryResult {
517 id,
518 status: s.status.clone(),
519 files: s.files_count,
520 changes_since: Vec::new(),
521 daemon_status: s.daemon_status.clone(),
522 last_rebuild_at: s.last_rebuild_at,
523 }
524 }
525 ClientMessage::HistoryQuery { since, id } => {
526 let Ok(s) = shared.lock() else {
527 tracing::warn!("ixd: shared lock poisoned in history query");
528 continue;
529 };
530 let changes = s.history.since(since);
531 ServerMessage::QueryResult {
532 id,
533 status: s.status.clone(),
534 files: s.files_count,
535 changes_since: changes,
536 daemon_status: s.daemon_status.clone(),
537 last_rebuild_at: s.last_rebuild_at,
538 }
539 }
540 };
541
542 if let Ok(mut write_stream) = stream.try_clone() {
543 match serde_json::to_string(&response) {
544 Ok(mut line) => {
545 line.push('\n');
546 if write_stream.write_all(line.as_bytes()).is_err()
547 || write_stream.flush().is_err()
548 {
549 break;
550 }
551 }
552 Err(e) => {
553 tracing::warn!("ixd: failed to serialize query response: {e}");
554 break;
555 }
556 }
557 }
558 }
559 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
560 Err(_) => break,
561 }
562 }
563}
564
565pub struct DaemonClient {
567 stream: BufReader<UnixStream>,
568}
569
570impl DaemonClient {
571 pub fn connect(root: &Path) -> Result<Self> {
577 let sp = socket_path(root);
578 let stream = UnixStream::connect(&sp)?;
579 stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
580 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
581 Ok(Self {
582 stream: BufReader::new(stream),
583 })
584 }
585
586 pub fn recv(&mut self) -> Result<ServerMessage> {
592 let mut line = String::new();
593 let bytes = self.stream.read_line(&mut line).map_err(|e| {
594 if e.kind() == std::io::ErrorKind::TimedOut {
595 DaemonSockError::Io(std::io::Error::new(
596 std::io::ErrorKind::TimedOut,
597 "recv timed out after 5s",
598 ))
599 } else {
600 DaemonSockError::Io(e)
601 }
602 })?;
603 if bytes == 0 {
604 return Err(DaemonSockError::Io(std::io::Error::new(
605 std::io::ErrorKind::UnexpectedEof,
606 "daemon closed connection",
607 )));
608 }
609 let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
610 DaemonSockError::Io(std::io::Error::new(
611 std::io::ErrorKind::InvalidData,
612 format!("invalid JSON: {e}"),
613 ))
614 })?;
615 Ok(msg)
616 }
617
618 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
624 let stream = self.stream.get_mut();
625 let mut line = serde_json::to_string(msg)?;
626 line.push('\n');
627 stream.write_all(line.as_bytes())?;
628 stream.flush()?;
629 Ok(())
630 }
631}
632
633fn now_secs() -> u64 {
635 SystemTime::now()
636 .duration_since(UNIX_EPOCH)
637 .unwrap_or_default()
638 .as_secs()
639}
640
641impl Drop for DaemonServer {
642 fn drop(&mut self) {
643 self.running
644 .store(false, std::sync::atomic::Ordering::SeqCst);
645 if let Some(handle) = self.accept_handle.take() {
646 let _ = handle.join();
647 }
648 let _ = std::fs::remove_file(&self.socket_path);
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use std::path::PathBuf;
656
657 #[test]
658 fn socket_path_deterministic() {
659 let root = PathBuf::from("/tmp/test-project");
660 let p1 = socket_path(&root);
661 let p2 = socket_path(&root);
662 assert_eq!(p1, p2, "same root must produce same socket path");
663 }
664
665 #[test]
666 fn socket_path_different_roots() {
667 let r1 = PathBuf::from("/tmp/project-a");
668 let r2 = PathBuf::from("/tmp/project-b");
669 assert_ne!(socket_path(&r1), socket_path(&r2));
670 }
671
672 #[test]
673 fn socket_path_uses_xdg() {
674 unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
675 let p = socket_path(Path::new("/tmp/some-project"));
676 assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
677 assert!(p.extension().is_some_and(|e| e == "sock"));
678 unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
679 }
680
681 #[test]
682 fn server_message_ndjson_roundtrip() {
683 let msg = ServerMessage::Status {
684 pid: 1234,
685 status: "idle".to_string(),
686 files: 42,
687 daemon_status: None,
688 };
689 let json = serde_json::to_string(&msg).expect("serialize");
690 assert!(json.contains("\"t\":\"status\""), "tag field present");
691 assert!(
692 !json.contains("daemon_status"),
693 "daemon_status should be omitted when None"
694 );
695
696 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
697 if let ServerMessage::Status {
698 pid,
699 status,
700 files,
701 daemon_status,
702 } = back
703 {
704 assert_eq!(pid, 1234);
705 assert_eq!(status, "idle");
706 assert_eq!(files, 42);
707 assert_eq!(daemon_status, None);
708 } else {
709 panic!("wrong variant after roundtrip");
710 }
711 }
712
713 #[test]
714 fn files_changed_roundtrip() {
715 let msg = ServerMessage::FilesChanged {
716 batch: vec![FileChange {
717 path: PathBuf::from("src/main.rs"),
718 mtime: 1_776_468_629,
719 op: FileOp::Modify,
720 }],
721 timestamp: 1_776_468_629,
722 };
723 let json = serde_json::to_string(&msg).expect("serialize");
724 let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
725 if let ServerMessage::FilesChanged { batch, timestamp } = back {
726 assert_eq!(batch.len(), 1);
727 assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
728 assert_eq!(timestamp, 1_776_468_629);
729 } else {
730 panic!("wrong variant");
731 }
732 }
733
734 #[test]
735 fn client_message_roundtrip() {
736 let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
737 let json = serde_json::to_string(&msg).expect("serialize");
738 let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
739 if let ClientMessage::HistoryQuery { since, id } = back {
740 assert_eq!(since, 1000);
741 assert_eq!(id, 7);
742 } else {
743 panic!("wrong variant");
744 }
745 }
746
747 #[test]
748 fn history_since() {
749 let mut h = History::new();
750 h.push(
751 100,
752 vec![FileChange {
753 path: PathBuf::from("a.rs"),
754 mtime: 100,
755 op: FileOp::Create,
756 }],
757 );
758 h.push(
759 200,
760 vec![FileChange {
761 path: PathBuf::from("b.rs"),
762 mtime: 200,
763 op: FileOp::Modify,
764 }],
765 );
766 h.push(
767 300,
768 vec![FileChange {
769 path: PathBuf::from("c.rs"),
770 mtime: 300,
771 op: FileOp::Delete,
772 }],
773 );
774
775 let changes = h.since(150);
776 assert_eq!(changes.len(), 2);
777 assert_eq!(changes[0].path, PathBuf::from("b.rs"));
778 assert_eq!(changes[1].path, PathBuf::from("c.rs"));
779 }
780
781 #[test]
782 fn history_capacity() {
783 let mut h = History::new();
784 for i in 0..=HISTORY_CAPACITY {
785 h.push(
786 i as u64,
787 vec![FileChange {
788 path: PathBuf::from(format!("f{i}")),
789 mtime: i as u64,
790 op: FileOp::Modify,
791 }],
792 );
793 }
794 assert_eq!(h.entries.len(), HISTORY_CAPACITY);
795 assert_eq!(h.entries.front().expect("non-empty").0, 1);
797 }
798
799 #[test]
800 fn server_client_connect_and_broadcast() {
801 let tmp = tempfile::tempdir().expect("tempdir");
802 let root = tmp.path().to_path_buf();
803
804 let mut server = DaemonServer::new(&root).expect("create server");
805 let sp = server.path().to_path_buf();
806 let _ = server.start();
807
808 let stream = UnixStream::connect(&sp).expect("connect");
810 let mut client = DaemonClient {
811 stream: BufReader::new(stream),
812 };
813
814 std::thread::sleep(std::time::Duration::from_millis(200));
816
817 server.set_status(&DaemonStatus::Idle, 10);
818
819 server.broadcast(&ServerMessage::Status {
821 pid: 1234,
822 status: "idle".to_string(),
823 files: 10,
824 daemon_status: Some(DaemonStatus::Idle),
825 });
826
827 client
830 .stream
831 .get_mut()
832 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
833 .expect("set timeout");
834
835 match client.recv() {
836 Ok(ServerMessage::Status {
837 pid,
838 status,
839 files,
840 daemon_status,
841 }) => {
842 assert_eq!(pid, 1234);
843 assert_eq!(status, "idle");
844 assert_eq!(files, 10);
845 assert!(daemon_status.is_some());
846 }
847 Ok(other) => panic!("expected Status, got {other:?}"),
848 Err(e) => panic!("recv failed: {e}"),
849 }
850 }
851
852 #[test]
853 fn client_query_status() {
854 let tmp = tempfile::tempdir().expect("tempdir");
855 let root = tmp.path().to_path_buf();
856
857 let mut server = DaemonServer::new(&root).expect("create server");
858 let sp = server.path().to_path_buf();
859 let _ = server.start();
860 server.set_status(&DaemonStatus::Indexing { entropy: 42 }, 99);
861
862 let stream = UnixStream::connect(&sp).expect("connect");
863 let mut client = DaemonClient {
864 stream: BufReader::new(stream),
865 };
866
867 std::thread::sleep(std::time::Duration::from_millis(200));
868
869 client
870 .send(&ClientMessage::StatusQuery { id: 123 })
871 .expect("send query");
872
873 client
874 .stream
875 .get_mut()
876 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
877 .expect("set timeout");
878
879 match client.recv() {
880 Ok(ServerMessage::QueryResult {
881 id,
882 status,
883 files,
884 changes_since,
885 daemon_status,
886 last_rebuild_at,
887 }) => {
888 eprintln!(
889 "[JSON] id={}, status={}, files={}, daemon_status={:?}, last_rebuild_at={:?}",
890 id, status, files, daemon_status, last_rebuild_at
891 );
892 assert_eq!(id, 123);
893 assert_eq!(status, "indexing (entropy: 42)");
894 assert_eq!(files, 99);
895 assert!(changes_since.is_empty());
896 assert_eq!(daemon_status, Some(DaemonStatus::Indexing { entropy: 42 }));
897 assert_eq!(last_rebuild_at, None);
898 }
899 Ok(other) => panic!("expected QueryResult, got {other:?}"),
900 Err(e) => panic!("recv failed: {e}"),
901 }
902 }
903}