agent-doc 0.32.3

Interactive document sessions with AI agents
Documentation
//! Socket-based IPC for editor plugin communication.
//!
//! Uses Unix domain sockets (Linux/macOS) or Windows named pipes via the
//! `interprocess` crate. The socket replaces the file-based IPC mechanism
//! (NIO WatchService + patch files) for lower latency and no inotify issues.
//!
//! ## Architecture
//!
//! - **Listener** (plugin side): The editor plugin starts a socket listener
//!   at `.agent-doc/ipc.sock`. It accepts connections and processes JSON messages.
//! - **Sender** (CLI side): The `agent-doc write` command connects to the socket
//!   and sends patch JSON. Falls back to file-based IPC if socket unavailable.
//!
//! ## Protocol
//!
//! Messages are newline-delimited JSON (NDJSON). Each message is a single line
//! terminated by `\n`. The receiver reads lines and parses each as JSON.
//!
//! Message types:
//! - `{"type": "patch", "file": "...", "patches": [...], "frontmatter": "..."}` — apply patches
//! - `{"type": "reposition", "file": "..."}` — reposition boundary marker
//! - `{"type": "vcs_refresh"}` — trigger VCS refresh
//! - `{"type": "ack", "id": "..."}` — acknowledgment from plugin

use anyhow::{Context, Result};
use interprocess::local_socket::{
    GenericFilePath, ListenerOptions, ToFsName,
    traits::{Listener as _, Stream as _},
};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;

/// Socket filename within `.agent-doc/` directory.
const SOCKET_FILENAME: &str = "ipc.sock";

/// Get the socket path for a project.
pub fn socket_path(project_root: &Path) -> PathBuf {
    project_root.join(".agent-doc").join(SOCKET_FILENAME)
}

/// Check if a socket listener is active.
pub fn is_listener_active(project_root: &Path) -> bool {
    let sock = socket_path(project_root);
    if !sock.exists() {
        return false;
    }
    // Try connecting — if it succeeds, the listener is active
    match try_connect(project_root) {
        Ok(_) => true,
        Err(_) => {
            // Stale socket file — clean it up
            let _ = std::fs::remove_file(&sock);
            false
        }
    }
}

/// Connect to the socket. Returns a stream for sending messages.
fn try_connect(project_root: &Path) -> Result<interprocess::local_socket::Stream> {
    let path = socket_path(project_root);
    let name = path.to_fs_name::<GenericFilePath>()?;
    let opts = interprocess::local_socket::ConnectOptions::new().name(name);
    let stream = opts.connect_sync()
        .context("failed to connect to IPC socket")?;
    Ok(stream)
}

/// Send a JSON message to the plugin via socket IPC.
/// Returns Ok(response) if the plugin acknowledges, Err if socket unavailable.
pub fn send_message(project_root: &Path, message: &serde_json::Value) -> Result<Option<String>> {
    let stream = try_connect(project_root)?;

    // interprocess Stream implements Read + Write via halves
    let (reader_half, mut writer_half) = stream.split();

    // Send NDJSON message
    let mut msg = serde_json::to_string(message)?;
    msg.push('\n');
    writer_half.write_all(msg.as_bytes())?;
    writer_half.flush()?;

    // Read ack (with manual timeout via thread)
    let (tx, rx) = std::sync::mpsc::channel();
    std::thread::spawn(move || {
        let mut reader = BufReader::new(reader_half);
        let mut ack_line = String::new();
        let result = reader.read_line(&mut ack_line);
        let _ = tx.send((result, ack_line));
    });

    match rx.recv_timeout(Duration::from_secs(2)) {
        Ok((Ok(0), _)) => Err(anyhow::anyhow!("IPC ack: plugin closed connection without responding")),
        Ok((Ok(_), line)) => Ok(Some(line.trim().to_string())),
        Ok((Err(e), _)) => Err(anyhow::anyhow!("IPC ack read error: {}", e)),
        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(anyhow::anyhow!("IPC ack timeout (2s)")),
        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => Err(anyhow::anyhow!("IPC reader thread disconnected")),
    }
}

/// Send a patch message to the plugin.
pub fn send_patch(
    project_root: &Path,
    file: &str,
    patches_json: &str,
    frontmatter_yaml: Option<&str>,
) -> Result<bool> {
    let message = serde_json::json!({
        "type": "patch",
        "file": file,
        "patches": serde_json::from_str::<serde_json::Value>(patches_json)?,
        "frontmatter": frontmatter_yaml,
    });

    match send_message(project_root, &message) {
        Ok(Some(ack)) => {
            eprintln!("[ipc-socket] patch sent, ack: {}", ack);
            Ok(true)
        }
        Ok(None) => {
            eprintln!("[ipc-socket] patch sent, no ack");
            Ok(true)
        }
        Err(e) => Err(e),
    }
}

/// Send a reposition boundary message.
pub fn send_reposition(project_root: &Path, file: &str) -> Result<bool> {
    let message = serde_json::json!({
        "type": "reposition",
        "file": file,
    });

    send_message(project_root, &message).map(|_| true)
}

/// Send a VCS refresh signal.
pub fn send_vcs_refresh(project_root: &Path) -> Result<bool> {
    let message = serde_json::json!({
        "type": "vcs_refresh",
    });

    send_message(project_root, &message).map(|_| true)
}

/// Start a socket listener (for use by the FFI library / plugin).
/// This blocks the calling thread — run it on a background thread.
#[allow(unreachable_code)]
pub fn start_listener<F>(project_root: &Path, handler: F) -> Result<()>
where
    F: Fn(&str) -> Option<String> + Send + 'static,
{
    let sock_path = socket_path(project_root);

    // Clean up stale socket
    if sock_path.exists() {
        let _ = std::fs::remove_file(&sock_path);
    }

    // Ensure parent directory exists
    if let Some(parent) = sock_path.parent() {
        std::fs::create_dir_all(parent)?;
    }

    eprintln!("[ipc-socket] listening on {:?}", sock_path);

    let name = sock_path.to_fs_name::<GenericFilePath>()?;
    let opts = ListenerOptions::new().name(name);
    let listener = opts.create_sync()?;

    loop {
        match listener.accept() {
            Ok(stream) => {
                let (reader_half, mut writer_half) = stream.split();
                let mut reader = BufReader::new(reader_half);
                let mut line = String::new();

                while reader.read_line(&mut line).unwrap_or(0) > 0 {
                    let trimmed = line.trim();
                    if !trimmed.is_empty()
                        && let Some(response) = handler(trimmed)
                    {
                        let mut resp = response;
                        resp.push('\n');
                        if let Err(e) = writer_half.write_all(resp.as_bytes()) {
                            eprintln!("[ipc-socket] handler write error: {}", e);
                        }
                        if let Err(e) = writer_half.flush() {
                            eprintln!("[ipc-socket] handler flush error: {}", e);
                        }
                    }
                    line.clear();
                }
            }
            Err(e) => {
                eprintln!("[ipc-socket] accept error: {}", e);
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;

    #[test]
    fn socket_roundtrip() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path().to_path_buf();
        std::fs::create_dir_all(root.join(".agent-doc")).unwrap();

        let root_clone = root.clone();
        let server = thread::spawn(move || {
            start_listener(&root_clone, |msg| {
                let v: serde_json::Value = serde_json::from_str(msg).ok()?;
                Some(serde_json::json!({"type": "ack", "id": v["type"]}).to_string())
            })
            .ok();
        });

        // Give the server time to start
        thread::sleep(Duration::from_millis(100));

        // Send a message
        let msg = serde_json::json!({"type": "vcs_refresh"});
        let result = send_message(&root, &msg).unwrap();
        assert!(result.is_some());
        let ack: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
        assert_eq!(ack["type"], "ack");
        assert_eq!(ack["id"], "vcs_refresh");

        // Clean up — remove socket to stop listener
        let _ = std::fs::remove_file(socket_path(&root));
        drop(server);
    }
}