Skip to main content

agent_doc/
ipc_socket.rs

1//! Socket-based IPC for editor plugin communication.
2//!
3//! Uses Unix domain sockets (Linux/macOS) or Windows named pipes via the
4//! `interprocess` crate. The socket replaces the file-based IPC mechanism
5//! (NIO WatchService + patch files) for lower latency and no inotify issues.
6//!
7//! ## Architecture
8//!
9//! - **Listener** (plugin side): The editor plugin starts a socket listener
10//!   at `.agent-doc/ipc.sock`. It accepts connections and processes JSON messages.
11//! - **Sender** (CLI side): The `agent-doc write` command connects to the socket
12//!   and sends patch JSON. Falls back to file-based IPC if socket unavailable.
13//!
14//! ## Protocol
15//!
16//! Messages are newline-delimited JSON (NDJSON). Each message is a single line
17//! terminated by `\n`. The receiver reads lines and parses each as JSON.
18//!
19//! Message types:
20//! - `{"type": "patch", "file": "...", "patches": [...], "frontmatter": "..."}` — apply patches
21//! - `{"type": "reposition", "file": "..."}` — reposition boundary marker
22//! - `{"type": "vcs_refresh"}` — trigger VCS refresh
23//! - `{"type": "ack", "id": "..."}` — acknowledgment from plugin
24
25use anyhow::{Context, Result};
26use interprocess::local_socket::{
27    GenericFilePath, ListenerOptions, ToFsName,
28    traits::{Listener as _, Stream as _},
29};
30use std::io::{BufRead, BufReader, Write};
31use std::path::{Path, PathBuf};
32use std::time::Duration;
33
34/// Socket filename within `.agent-doc/` directory.
35const SOCKET_FILENAME: &str = "ipc.sock";
36
37/// Get the socket path for a project.
38pub fn socket_path(project_root: &Path) -> PathBuf {
39    project_root.join(".agent-doc").join(SOCKET_FILENAME)
40}
41
42/// Check if a socket listener is active.
43pub fn is_listener_active(project_root: &Path) -> bool {
44    let sock = socket_path(project_root);
45    if !sock.exists() {
46        return false;
47    }
48    // Try connecting — if it succeeds, the listener is active
49    match try_connect(project_root) {
50        Ok(_) => true,
51        Err(_) => {
52            // Stale socket file — clean it up
53            let _ = std::fs::remove_file(&sock);
54            false
55        }
56    }
57}
58
59/// Connect to the socket. Returns a stream for sending messages.
60fn try_connect(project_root: &Path) -> Result<interprocess::local_socket::Stream> {
61    let path = socket_path(project_root);
62    let name = path.to_fs_name::<GenericFilePath>()?;
63    let opts = interprocess::local_socket::ConnectOptions::new().name(name);
64    let stream = opts.connect_sync()
65        .context("failed to connect to IPC socket")?;
66    Ok(stream)
67}
68
69/// Send a JSON message to the plugin via socket IPC.
70/// Returns Ok(response) if the plugin acknowledges, Err if socket unavailable.
71pub fn send_message(project_root: &Path, message: &serde_json::Value) -> Result<Option<String>> {
72    let stream = try_connect(project_root)?;
73
74    // interprocess Stream implements Read + Write via halves
75    let (reader_half, mut writer_half) = stream.split();
76
77    // Send NDJSON message
78    let mut msg = serde_json::to_string(message)?;
79    msg.push('\n');
80    writer_half.write_all(msg.as_bytes())?;
81    writer_half.flush()?;
82
83    // Read ack (with manual timeout via thread)
84    let (tx, rx) = std::sync::mpsc::channel();
85    std::thread::spawn(move || {
86        let mut reader = BufReader::new(reader_half);
87        let mut ack_line = String::new();
88        let result = reader.read_line(&mut ack_line);
89        let _ = tx.send((result, ack_line));
90    });
91
92    match rx.recv_timeout(Duration::from_secs(2)) {
93        Ok((Ok(0), _)) => Err(anyhow::anyhow!("IPC ack: plugin closed connection without responding")),
94        Ok((Ok(_), line)) => Ok(Some(line.trim().to_string())),
95        Ok((Err(e), _)) => Err(anyhow::anyhow!("IPC ack read error: {}", e)),
96        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(anyhow::anyhow!("IPC ack timeout (2s)")),
97        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Err(anyhow::anyhow!("IPC reader thread disconnected")),
98    }
99}
100
101/// Send a patch message to the plugin.
102pub fn send_patch(
103    project_root: &Path,
104    file: &str,
105    patches_json: &str,
106    frontmatter_yaml: Option<&str>,
107) -> Result<bool> {
108    let message = serde_json::json!({
109        "type": "patch",
110        "file": file,
111        "patches": serde_json::from_str::<serde_json::Value>(patches_json)?,
112        "frontmatter": frontmatter_yaml,
113    });
114
115    match send_message(project_root, &message) {
116        Ok(Some(ack)) => {
117            eprintln!("[ipc-socket] patch sent, ack: {}", ack);
118            Ok(true)
119        }
120        Ok(None) => {
121            eprintln!("[ipc-socket] patch sent, no ack");
122            Ok(true)
123        }
124        Err(e) => Err(e),
125    }
126}
127
128/// Send a reposition boundary message.
129pub fn send_reposition(project_root: &Path, file: &str) -> Result<bool> {
130    let message = serde_json::json!({
131        "type": "reposition",
132        "file": file,
133    });
134
135    send_message(project_root, &message).map(|_| true)
136}
137
138/// Send a VCS refresh signal.
139pub fn send_vcs_refresh(project_root: &Path) -> Result<bool> {
140    let message = serde_json::json!({
141        "type": "vcs_refresh",
142    });
143
144    send_message(project_root, &message).map(|_| true)
145}
146
147/// Start a socket listener (for use by the FFI library / plugin).
148/// This blocks the calling thread — run it on a background thread.
149#[allow(unreachable_code)]
150pub fn start_listener<F>(project_root: &Path, handler: F) -> Result<()>
151where
152    F: Fn(&str) -> Option<String> + Send + 'static,
153{
154    let sock_path = socket_path(project_root);
155
156    // Clean up stale socket
157    if sock_path.exists() {
158        let _ = std::fs::remove_file(&sock_path);
159    }
160
161    // Ensure parent directory exists
162    if let Some(parent) = sock_path.parent() {
163        std::fs::create_dir_all(parent)?;
164    }
165
166    eprintln!("[ipc-socket] listening on {:?}", sock_path);
167
168    let name = sock_path.to_fs_name::<GenericFilePath>()?;
169    let opts = ListenerOptions::new().name(name);
170    let listener = opts.create_sync()?;
171
172    loop {
173        match listener.accept() {
174            Ok(stream) => {
175                let (reader_half, mut writer_half) = stream.split();
176                let mut reader = BufReader::new(reader_half);
177                let mut line = String::new();
178
179                while reader.read_line(&mut line).unwrap_or(0) > 0 {
180                    let trimmed = line.trim();
181                    if !trimmed.is_empty()
182                        && let Some(response) = handler(trimmed)
183                    {
184                        let mut resp = response;
185                        resp.push('\n');
186                        if let Err(e) = writer_half.write_all(resp.as_bytes()) {
187                            eprintln!("[ipc-socket] handler write error: {}", e);
188                        }
189                        if let Err(e) = writer_half.flush() {
190                            eprintln!("[ipc-socket] handler flush error: {}", e);
191                        }
192                    }
193                    line.clear();
194                }
195            }
196            Err(e) => {
197                eprintln!("[ipc-socket] accept error: {}", e);
198            }
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use std::thread;
207
208    #[test]
209    fn socket_roundtrip() {
210        let dir = tempfile::tempdir().unwrap();
211        let root = dir.path().to_path_buf();
212        std::fs::create_dir_all(root.join(".agent-doc")).unwrap();
213
214        let root_clone = root.clone();
215        let server = thread::spawn(move || {
216            start_listener(&root_clone, |msg| {
217                let v: serde_json::Value = serde_json::from_str(msg).ok()?;
218                Some(serde_json::json!({"type": "ack", "id": v["type"]}).to_string())
219            })
220            .ok();
221        });
222
223        // Give the server time to start
224        thread::sleep(Duration::from_millis(100));
225
226        // Send a message
227        let msg = serde_json::json!({"type": "vcs_refresh"});
228        let result = send_message(&root, &msg).unwrap();
229        assert!(result.is_some());
230        let ack: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
231        assert_eq!(ack["type"], "ack");
232        assert_eq!(ack["id"], "vcs_refresh");
233
234        // Clean up — remove socket to stop listener
235        let _ = std::fs::remove_file(socket_path(&root));
236        drop(server);
237    }
238}