use anyhow::{Context, Result};
use interprocess::local_socket::{
GenericFilePath, ListenerOptions, ToFsName,
traits::{Listener as _, Stream as _},
};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;
const SOCKET_FILENAME: &str = "ipc.sock";
pub fn socket_path(project_root: &Path) -> PathBuf {
project_root.join(".agent-doc").join(SOCKET_FILENAME)
}
pub fn is_listener_active(project_root: &Path) -> bool {
let sock = socket_path(project_root);
if !sock.exists() {
return false;
}
match try_connect(project_root) {
Ok(_) => true,
Err(_) => {
let _ = std::fs::remove_file(&sock);
false
}
}
}
fn try_connect(project_root: &Path) -> Result<interprocess::local_socket::Stream> {
let path = socket_path(project_root);
let name = path.to_fs_name::<GenericFilePath>()?;
let opts = interprocess::local_socket::ConnectOptions::new().name(name);
let stream = opts.connect_sync()
.context("failed to connect to IPC socket")?;
Ok(stream)
}
pub fn send_message(project_root: &Path, message: &serde_json::Value) -> Result<Option<String>> {
let stream = try_connect(project_root)?;
let (reader_half, mut writer_half) = stream.split();
let mut msg = serde_json::to_string(message)?;
msg.push('\n');
writer_half.write_all(msg.as_bytes())?;
writer_half.flush()?;
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let mut reader = BufReader::new(reader_half);
let mut ack_line = String::new();
let result = reader.read_line(&mut ack_line);
let _ = tx.send((result, ack_line));
});
match rx.recv_timeout(Duration::from_secs(2)) {
Ok((Ok(0), _)) => Err(anyhow::anyhow!("IPC ack: plugin closed connection without responding")),
Ok((Ok(_), line)) => Ok(Some(line.trim().to_string())),
Ok((Err(e), _)) => Err(anyhow::anyhow!("IPC ack read error: {}", e)),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(anyhow::anyhow!("IPC ack timeout (2s)")),
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Err(anyhow::anyhow!("IPC reader thread disconnected")),
}
}
pub fn send_patch(
project_root: &Path,
file: &str,
patches_json: &str,
frontmatter_yaml: Option<&str>,
) -> Result<bool> {
let message = serde_json::json!({
"type": "patch",
"file": file,
"patches": serde_json::from_str::<serde_json::Value>(patches_json)?,
"frontmatter": frontmatter_yaml,
});
match send_message(project_root, &message) {
Ok(Some(ack)) => {
eprintln!("[ipc-socket] patch sent, ack: {}", ack);
Ok(true)
}
Ok(None) => {
eprintln!("[ipc-socket] patch sent, no ack");
Ok(true)
}
Err(e) => Err(e),
}
}
pub fn send_reposition(project_root: &Path, file: &str) -> Result<bool> {
let message = serde_json::json!({
"type": "reposition",
"file": file,
});
send_message(project_root, &message).map(|_| true)
}
pub fn send_vcs_refresh(project_root: &Path) -> Result<bool> {
let message = serde_json::json!({
"type": "vcs_refresh",
});
send_message(project_root, &message).map(|_| true)
}
#[allow(unreachable_code)]
pub fn start_listener<F>(project_root: &Path, handler: F) -> Result<()>
where
F: Fn(&str) -> Option<String> + Send + 'static,
{
let sock_path = socket_path(project_root);
if sock_path.exists() {
let _ = std::fs::remove_file(&sock_path);
}
if let Some(parent) = sock_path.parent() {
std::fs::create_dir_all(parent)?;
}
eprintln!("[ipc-socket] listening on {:?}", sock_path);
let name = sock_path.to_fs_name::<GenericFilePath>()?;
let opts = ListenerOptions::new().name(name);
let listener = opts.create_sync()?;
loop {
match listener.accept() {
Ok(stream) => {
let (reader_half, mut writer_half) = stream.split();
let mut reader = BufReader::new(reader_half);
let mut line = String::new();
while reader.read_line(&mut line).unwrap_or(0) > 0 {
let trimmed = line.trim();
if !trimmed.is_empty()
&& let Some(response) = handler(trimmed)
{
let mut resp = response;
resp.push('\n');
if let Err(e) = writer_half.write_all(resp.as_bytes()) {
eprintln!("[ipc-socket] handler write error: {}", e);
}
if let Err(e) = writer_half.flush() {
eprintln!("[ipc-socket] handler flush error: {}", e);
}
}
line.clear();
}
}
Err(e) => {
eprintln!("[ipc-socket] accept error: {}", e);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn socket_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
std::fs::create_dir_all(root.join(".agent-doc")).unwrap();
let root_clone = root.clone();
let server = thread::spawn(move || {
start_listener(&root_clone, |msg| {
let v: serde_json::Value = serde_json::from_str(msg).ok()?;
Some(serde_json::json!({"type": "ack", "id": v["type"]}).to_string())
})
.ok();
});
thread::sleep(Duration::from_millis(100));
let msg = serde_json::json!({"type": "vcs_refresh"});
let result = send_message(&root, &msg).unwrap();
assert!(result.is_some());
let ack: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
assert_eq!(ack["type"], "ack");
assert_eq!(ack["id"], "vcs_refresh");
let _ = std::fs::remove_file(socket_path(&root));
drop(server);
}
}