1use anyhow::{Context, Result};
26use interprocess::local_socket::{
27 GenericFilePath, ListenerOptions, ToFsName,
28 traits::{Listener as _, Stream as _},
29};
30use std::io::{BufRead, BufReader, Write};
31use std::path::{Path, PathBuf};
32use std::time::Duration;
33
34const SOCKET_FILENAME: &str = "ipc.sock";
36
37pub fn socket_path(project_root: &Path) -> PathBuf {
39 project_root.join(".agent-doc").join(SOCKET_FILENAME)
40}
41
42pub fn is_listener_active(project_root: &Path) -> bool {
44 let sock = socket_path(project_root);
45 if !sock.exists() {
46 return false;
47 }
48 match try_connect(project_root) {
50 Ok(_) => true,
51 Err(_) => {
52 let _ = std::fs::remove_file(&sock);
54 false
55 }
56 }
57}
58
59fn try_connect(project_root: &Path) -> Result<interprocess::local_socket::Stream> {
61 let path = socket_path(project_root);
62 let name = path.to_fs_name::<GenericFilePath>()?;
63 let opts = interprocess::local_socket::ConnectOptions::new().name(name);
64 let stream = opts.connect_sync()
65 .context("failed to connect to IPC socket")?;
66 Ok(stream)
67}
68
69pub fn send_message(project_root: &Path, message: &serde_json::Value) -> Result<Option<String>> {
72 let stream = try_connect(project_root)?;
73
74 let (reader_half, mut writer_half) = stream.split();
76
77 let mut msg = serde_json::to_string(message)?;
79 msg.push('\n');
80 writer_half.write_all(msg.as_bytes())?;
81 writer_half.flush()?;
82
83 let (tx, rx) = std::sync::mpsc::channel();
85 std::thread::spawn(move || {
86 let mut reader = BufReader::new(reader_half);
87 let mut ack_line = String::new();
88 let result = reader.read_line(&mut ack_line);
89 let _ = tx.send((result, ack_line));
90 });
91
92 match rx.recv_timeout(Duration::from_secs(2)) {
93 Ok((Ok(0), _)) => Err(anyhow::anyhow!("IPC ack: plugin closed connection without responding")),
94 Ok((Ok(_), line)) => Ok(Some(line.trim().to_string())),
95 Ok((Err(e), _)) => Err(anyhow::anyhow!("IPC ack read error: {}", e)),
96 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(anyhow::anyhow!("IPC ack timeout (2s)")),
97 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Err(anyhow::anyhow!("IPC reader thread disconnected")),
98 }
99}
100
101pub fn send_patch(
103 project_root: &Path,
104 file: &str,
105 patches_json: &str,
106 frontmatter_yaml: Option<&str>,
107) -> Result<bool> {
108 let message = serde_json::json!({
109 "type": "patch",
110 "file": file,
111 "patches": serde_json::from_str::<serde_json::Value>(patches_json)?,
112 "frontmatter": frontmatter_yaml,
113 });
114
115 match send_message(project_root, &message) {
116 Ok(Some(ack)) => {
117 eprintln!("[ipc-socket] patch sent, ack: {}", ack);
118 Ok(true)
119 }
120 Ok(None) => {
121 eprintln!("[ipc-socket] patch sent, no ack");
122 Ok(true)
123 }
124 Err(e) => Err(e),
125 }
126}
127
128pub fn send_reposition(project_root: &Path, file: &str) -> Result<bool> {
130 let message = serde_json::json!({
131 "type": "reposition",
132 "file": file,
133 });
134
135 send_message(project_root, &message).map(|_| true)
136}
137
138pub fn send_vcs_refresh(project_root: &Path) -> Result<bool> {
140 let message = serde_json::json!({
141 "type": "vcs_refresh",
142 });
143
144 send_message(project_root, &message).map(|_| true)
145}
146
147#[allow(unreachable_code)]
150pub fn start_listener<F>(project_root: &Path, handler: F) -> Result<()>
151where
152 F: Fn(&str) -> Option<String> + Send + 'static,
153{
154 let sock_path = socket_path(project_root);
155
156 if sock_path.exists() {
158 let _ = std::fs::remove_file(&sock_path);
159 }
160
161 if let Some(parent) = sock_path.parent() {
163 std::fs::create_dir_all(parent)?;
164 }
165
166 eprintln!("[ipc-socket] listening on {:?}", sock_path);
167
168 let name = sock_path.to_fs_name::<GenericFilePath>()?;
169 let opts = ListenerOptions::new().name(name);
170 let listener = opts.create_sync()?;
171
172 loop {
173 match listener.accept() {
174 Ok(stream) => {
175 let (reader_half, mut writer_half) = stream.split();
176 let mut reader = BufReader::new(reader_half);
177 let mut line = String::new();
178
179 while reader.read_line(&mut line).unwrap_or(0) > 0 {
180 let trimmed = line.trim();
181 if !trimmed.is_empty()
182 && let Some(response) = handler(trimmed)
183 {
184 let mut resp = response;
185 resp.push('\n');
186 if let Err(e) = writer_half.write_all(resp.as_bytes()) {
187 eprintln!("[ipc-socket] handler write error: {}", e);
188 }
189 if let Err(e) = writer_half.flush() {
190 eprintln!("[ipc-socket] handler flush error: {}", e);
191 }
192 }
193 line.clear();
194 }
195 }
196 Err(e) => {
197 eprintln!("[ipc-socket] accept error: {}", e);
198 }
199 }
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use std::thread;
207
208 #[test]
209 fn socket_roundtrip() {
210 let dir = tempfile::tempdir().unwrap();
211 let root = dir.path().to_path_buf();
212 std::fs::create_dir_all(root.join(".agent-doc")).unwrap();
213
214 let root_clone = root.clone();
215 let server = thread::spawn(move || {
216 start_listener(&root_clone, |msg| {
217 let v: serde_json::Value = serde_json::from_str(msg).ok()?;
218 Some(serde_json::json!({"type": "ack", "id": v["type"]}).to_string())
219 })
220 .ok();
221 });
222
223 thread::sleep(Duration::from_millis(100));
225
226 let msg = serde_json::json!({"type": "vcs_refresh"});
228 let result = send_message(&root, &msg).unwrap();
229 assert!(result.is_some());
230 let ack: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
231 assert_eq!(ack["type"], "ack");
232 assert_eq!(ack["id"], "vcs_refresh");
233
234 let _ = std::fs::remove_file(socket_path(&root));
236 drop(server);
237 }
238}