Skip to main content

agent_doc/
ipc_socket.rs

1//! Socket-based IPC for editor plugin communication.
2//!
3//! Uses Unix domain sockets (Linux/macOS) or Windows named pipes via the
4//! `interprocess` crate. The socket replaces the file-based IPC mechanism
5//! (NIO WatchService + patch files) for lower latency and no inotify issues.
6//!
7//! ## Architecture
8//!
9//! - **Listener** (plugin side): The editor plugin starts a socket listener
10//!   at `.agent-doc/ipc.sock`. It accepts connections and processes JSON messages.
11//! - **Sender** (CLI side): The `agent-doc write` command connects to the socket
12//!   and sends patch JSON. Falls back to file-based IPC if socket unavailable.
13//!
14//! ## Protocol
15//!
16//! Messages are newline-delimited JSON (NDJSON). Each message is a single line
17//! terminated by `\n`. The receiver reads lines and parses each as JSON.
18//!
19//! Message types:
20//! - `{"type": "patch", "file": "...", "patches": [...], "frontmatter": "..."}` — apply patches
21//! - `{"type": "reposition", "file": "..."}` — reposition boundary marker
22//! - `{"type": "vcs_refresh"}` — trigger VCS refresh
23//! - `{"type": "ack", "id": "..."}` — acknowledgment from plugin
24
25use anyhow::{Context, Result};
26use interprocess::local_socket::{
27    GenericFilePath, ListenerOptions, ToFsName,
28    traits::{Listener as _, Stream as _},
29};
30use std::io::{BufRead, BufReader, Write};
31use std::path::{Path, PathBuf};
32use std::time::Duration;
33
34/// Socket filename within `.agent-doc/` directory.
35const SOCKET_FILENAME: &str = "ipc.sock";
36
37/// Get the socket path for a project.
38pub fn socket_path(project_root: &Path) -> PathBuf {
39    project_root.join(".agent-doc").join(SOCKET_FILENAME)
40}
41
42/// Check if a socket listener is active.
43pub fn is_listener_active(project_root: &Path) -> bool {
44    let sock = socket_path(project_root);
45    if !sock.exists() {
46        return false;
47    }
48    // Try connecting — if it succeeds, the listener is active
49    match try_connect(project_root) {
50        Ok(_) => true,
51        Err(_) => {
52            // Stale socket file — clean it up
53            let _ = std::fs::remove_file(&sock);
54            false
55        }
56    }
57}
58
59/// Connect to the socket. Returns a stream for sending messages.
60fn try_connect(project_root: &Path) -> Result<interprocess::local_socket::Stream> {
61    let path = socket_path(project_root);
62    let name = path.to_fs_name::<GenericFilePath>()?;
63    let opts = interprocess::local_socket::ConnectOptions::new().name(name);
64    let stream = opts.connect_sync()
65        .context("failed to connect to IPC socket")?;
66    Ok(stream)
67}
68
69/// Send a JSON message to the plugin via socket IPC.
70/// Returns Ok(response) if the plugin acknowledges, Err if socket unavailable.
71pub fn send_message(project_root: &Path, message: &serde_json::Value) -> Result<Option<String>> {
72    let stream = try_connect(project_root)?;
73
74    // interprocess Stream implements Read + Write via halves
75    let (reader_half, mut writer_half) = stream.split();
76
77    // Send NDJSON message
78    let mut msg = serde_json::to_string(message)?;
79    msg.push('\n');
80    writer_half.write_all(msg.as_bytes())?;
81    writer_half.flush()?;
82
83    // Read ack (with manual timeout via thread)
84    let (tx, rx) = std::sync::mpsc::channel();
85    std::thread::spawn(move || {
86        let mut reader = BufReader::new(reader_half);
87        let mut ack_line = String::new();
88        let result = reader.read_line(&mut ack_line);
89        let _ = tx.send((result, ack_line));
90    });
91
92    match rx.recv_timeout(Duration::from_secs(2)) {
93        Ok((Ok(0), _)) => Ok(None),
94        Ok((Ok(_), line)) => Ok(Some(line.trim().to_string())),
95        _ => Ok(None),
96    }
97}
98
99/// Send a patch message to the plugin.
100pub fn send_patch(
101    project_root: &Path,
102    file: &str,
103    patches_json: &str,
104    frontmatter_yaml: Option<&str>,
105) -> Result<bool> {
106    let message = serde_json::json!({
107        "type": "patch",
108        "file": file,
109        "patches": serde_json::from_str::<serde_json::Value>(patches_json)?,
110        "frontmatter": frontmatter_yaml,
111    });
112
113    match send_message(project_root, &message) {
114        Ok(Some(ack)) => {
115            eprintln!("[ipc-socket] patch sent, ack: {}", ack);
116            Ok(true)
117        }
118        Ok(None) => {
119            eprintln!("[ipc-socket] patch sent, no ack");
120            Ok(true)
121        }
122        Err(e) => {
123            eprintln!("[ipc-socket] send failed: {}", e);
124            Ok(false)
125        }
126    }
127}
128
129/// Send a reposition boundary message.
130pub fn send_reposition(project_root: &Path, file: &str) -> Result<bool> {
131    let message = serde_json::json!({
132        "type": "reposition",
133        "file": file,
134    });
135
136    match send_message(project_root, &message) {
137        Ok(_) => Ok(true),
138        Err(e) => {
139            eprintln!("[ipc-socket] reposition failed: {}", e);
140            Ok(false)
141        }
142    }
143}
144
145/// Send a VCS refresh signal.
146pub fn send_vcs_refresh(project_root: &Path) -> Result<bool> {
147    let message = serde_json::json!({
148        "type": "vcs_refresh",
149    });
150
151    match send_message(project_root, &message) {
152        Ok(_) => Ok(true),
153        Err(e) => {
154            eprintln!("[ipc-socket] vcs_refresh failed: {}", e);
155            Ok(false)
156        }
157    }
158}
159
160/// Start a socket listener (for use by the FFI library / plugin).
161/// This blocks the calling thread — run it on a background thread.
162#[allow(unreachable_code)]
163pub fn start_listener<F>(project_root: &Path, handler: F) -> Result<()>
164where
165    F: Fn(&str) -> Option<String> + Send + 'static,
166{
167    let sock_path = socket_path(project_root);
168
169    // Clean up stale socket
170    if sock_path.exists() {
171        let _ = std::fs::remove_file(&sock_path);
172    }
173
174    // Ensure parent directory exists
175    if let Some(parent) = sock_path.parent() {
176        std::fs::create_dir_all(parent)?;
177    }
178
179    eprintln!("[ipc-socket] listening on {:?}", sock_path);
180
181    let name = sock_path.to_fs_name::<GenericFilePath>()?;
182    let opts = ListenerOptions::new().name(name);
183    let listener = opts.create_sync()?;
184
185    loop {
186        match listener.accept() {
187            Ok(stream) => {
188                let (reader_half, mut writer_half) = stream.split();
189                let mut reader = BufReader::new(reader_half);
190                let mut line = String::new();
191
192                while reader.read_line(&mut line).unwrap_or(0) > 0 {
193                    let trimmed = line.trim();
194                    if !trimmed.is_empty()
195                        && let Some(response) = handler(trimmed)
196                    {
197                        let mut resp = response;
198                        resp.push('\n');
199                        let _ = writer_half.write_all(resp.as_bytes());
200                        let _ = writer_half.flush();
201                    }
202                    line.clear();
203                }
204            }
205            Err(e) => {
206                eprintln!("[ipc-socket] accept error: {}", e);
207            }
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::thread;
216
217    #[test]
218    fn socket_roundtrip() {
219        let dir = tempfile::tempdir().unwrap();
220        let root = dir.path().to_path_buf();
221        std::fs::create_dir_all(root.join(".agent-doc")).unwrap();
222
223        let root_clone = root.clone();
224        let server = thread::spawn(move || {
225            start_listener(&root_clone, |msg| {
226                let v: serde_json::Value = serde_json::from_str(msg).ok()?;
227                Some(serde_json::json!({"type": "ack", "id": v["type"]}).to_string())
228            })
229            .ok();
230        });
231
232        // Give the server time to start
233        thread::sleep(Duration::from_millis(100));
234
235        // Send a message
236        let msg = serde_json::json!({"type": "vcs_refresh"});
237        let result = send_message(&root, &msg).unwrap();
238        assert!(result.is_some());
239        let ack: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
240        assert_eq!(ack["type"], "ack");
241        assert_eq!(ack["id"], "vcs_refresh");
242
243        // Clean up — remove socket to stop listener
244        let _ = std::fs::remove_file(socket_path(&root));
245        drop(server);
246    }
247}