collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! ACP stdio server — reads JSON-RPC from stdin, writes responses/notifications to stdout.
//!
//! Usage: `collet acp serve`

use anyhow::Result;
use serde_json::json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

use crate::api::provider::OpenAiCompatibleProvider;
use crate::config::Config;
use crate::mcp::protocol::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};

use super::protocol::*;
use super::session::{SessionManager, agent_event_to_update};

/// Run the ACP server on stdio. Blocks until stdin is closed.
pub async fn run_acp_server(config: Config, client: OpenAiCompatibleProvider) -> Result<()> {
    let stdin = tokio::io::stdin();
    let mut stdout = tokio::io::stdout();
    let mut reader = BufReader::new(stdin);

    let mut session_mgr = SessionManager::new(config, client);
    let mut line = String::new();

    tracing::info!("ACP server started on stdio");

    loop {
        line.clear();
        let bytes_read = reader.read_line(&mut line).await?;
        if bytes_read == 0 {
            // stdin closed
            tracing::info!("ACP server: stdin closed, shutting down");
            break;
        }

        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }

        // Parse JSON-RPC request
        let request: JsonRpcRequest = match serde_json::from_str(trimmed) {
            Ok(r) => r,
            Err(e) => {
                let err_resp = make_error_response(0, -32700, &format!("Parse error: {}", e));
                write_response(&mut stdout, &err_resp).await?;
                continue;
            }
        };

        tracing::debug!(method = %request.method, id = request.id, "ACP request");

        let response = handle_request(&mut session_mgr, &mut stdout, &request).await;
        write_response(&mut stdout, &response).await?;

        // If this was a session/prompt, stream events until done
        if request.method == METHOD_SESSION_PROMPT
            && let Some(params) = &request.params
            && let Ok(prompt_params) = serde_json::from_value::<SessionPromptParams>(params.clone())
        {
            stream_events(&mut session_mgr, &mut stdout, &prompt_params.session_id).await?;
        }
    }

    Ok(())
}

async fn handle_request(
    session_mgr: &mut SessionManager,
    _stdout: &mut tokio::io::Stdout,
    request: &JsonRpcRequest,
) -> JsonRpcResponse {
    let id = request.id;
    let params = request.params.clone().unwrap_or(json!({}));

    match request.method.as_str() {
        METHOD_SESSION_INITIALIZE => {
            let _params: InitializeParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => {
                    return make_error_response(id, -32602, &format!("Invalid params: {}", e));
                }
            };

            let result = InitializeResult {
                name: "collet".to_string(),
                version: env!("CARGO_PKG_VERSION").to_string(),
                capabilities: AcpCapabilities::default(),
            };

            make_success_response(id, serde_json::to_value(result).unwrap())
        }

        METHOD_SESSION_NEW => {
            let params: SessionNewParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => {
                    return make_error_response(id, -32602, &format!("Invalid params: {}", e));
                }
            };

            match session_mgr.create_session(params).await {
                Ok(session_id) => {
                    let result = SessionNewResult { session_id };
                    make_success_response(id, serde_json::to_value(result).unwrap())
                }
                Err(e) => {
                    make_error_response(id, -32000, &format!("Session creation failed: {}", e))
                }
            }
        }

        METHOD_SESSION_PROMPT => {
            let params: SessionPromptParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => {
                    return make_error_response(id, -32602, &format!("Invalid params: {}", e));
                }
            };

            match session_mgr.send_prompt(&params.session_id, params.text, params.mode) {
                Ok(()) => {
                    let result = SessionPromptResult { accepted: true };
                    make_success_response(id, serde_json::to_value(result).unwrap())
                }
                Err(e) => make_error_response(id, -32000, &format!("Prompt failed: {}", e)),
            }
        }

        METHOD_SESSION_CANCEL => {
            let params: SessionCancelParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => {
                    return make_error_response(id, -32602, &format!("Invalid params: {}", e));
                }
            };

            let cancelled = session_mgr.cancel_session(&params.session_id);
            let result = SessionCancelResult { cancelled };
            make_success_response(id, serde_json::to_value(result).unwrap())
        }

        METHOD_SESSION_CLOSE => {
            let params: SessionCloseParams = match serde_json::from_value(params) {
                Ok(p) => p,
                Err(e) => {
                    return make_error_response(id, -32602, &format!("Invalid params: {}", e));
                }
            };

            let closed = session_mgr.remove_session(&params.session_id).is_some();
            let result = SessionCloseResult { closed };
            make_success_response(id, serde_json::to_value(result).unwrap())
        }

        _ => make_error_response(id, -32601, &format!("Method not found: {}", request.method)),
    }
}

/// Stream `AgentEvent`s as `session/update` notifications until Done.
async fn stream_events(
    session_mgr: &mut SessionManager,
    stdout: &mut tokio::io::Stdout,
    session_id: &str,
) -> Result<()> {
    let session = match session_mgr.get_session_mut(session_id) {
        Some(s) => s,
        None => return Ok(()),
    };

    // Use the session's own ID field for logging (validates field round-trip).
    let sid = session.session_id.clone();

    loop {
        match session.event_rx.recv().await {
            Some(event) => {
                // Capture context on Done
                let is_done = matches!(&event, crate::agent::r#loop::AgentEvent::Done { .. });
                let returned_context =
                    if let crate::agent::r#loop::AgentEvent::Done { context, .. } = &event {
                        Some(context.clone())
                    } else {
                        None
                    };

                if let Some(payload) = agent_event_to_update(&event) {
                    let notif = SessionUpdateNotification {
                        session_id: sid.clone(),
                        payload,
                    };
                    let rpc_notif = JsonRpcNotification::new(
                        METHOD_SESSION_UPDATE,
                        serde_json::to_value(&notif).unwrap(),
                    );
                    write_notification(stdout, &rpc_notif).await?;
                }

                if is_done {
                    if let Some(ctx) = returned_context {
                        session_mgr.return_context(&sid, ctx);
                    }
                    break;
                }
            }
            None => {
                // Channel closed unexpectedly
                let notif = SessionUpdateNotification {
                    session_id: sid.clone(),
                    payload: UpdatePayload::Done,
                };
                let rpc_notif = JsonRpcNotification::new(
                    METHOD_SESSION_UPDATE,
                    serde_json::to_value(&notif).unwrap(),
                );
                write_notification(stdout, &rpc_notif).await?;
                break;
            }
        }
    }

    Ok(())
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

fn make_success_response(id: u64, result: serde_json::Value) -> JsonRpcResponse {
    JsonRpcResponse {
        jsonrpc: "2.0".to_string(),
        id,
        result: Some(result),
        error: None,
    }
}

fn make_error_response(id: u64, code: i64, message: &str) -> JsonRpcResponse {
    JsonRpcResponse {
        jsonrpc: "2.0".to_string(),
        id,
        result: None,
        error: Some(JsonRpcError {
            code,
            message: message.to_string(),
            data: None,
        }),
    }
}

async fn write_response(stdout: &mut tokio::io::Stdout, response: &JsonRpcResponse) -> Result<()> {
    let mut payload = serde_json::to_string(response)?;
    payload.push('\n');
    stdout.write_all(payload.as_bytes()).await?;
    stdout.flush().await?;
    Ok(())
}

async fn write_notification(
    stdout: &mut tokio::io::Stdout,
    notification: &JsonRpcNotification,
) -> Result<()> {
    let mut payload = serde_json::to_string(notification)?;
    payload.push('\n');
    stdout.write_all(payload.as_bytes()).await?;
    stdout.flush().await?;
    Ok(())
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_make_success_response() {
        let resp = make_success_response(1, json!({"session_id": "acp-1"}));
        assert_eq!(resp.id, 1);
        assert!(resp.result.is_some());
        assert!(resp.error.is_none());
    }

    #[test]
    fn test_make_error_response() {
        let resp = make_error_response(2, -32601, "Method not found");
        assert_eq!(resp.id, 2);
        assert!(resp.result.is_none());
        let err = resp.error.unwrap();
        assert_eq!(err.code, -32601);
    }
}