use anyhow::Result;
use serde_json::json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::api::provider::OpenAiCompatibleProvider;
use crate::config::Config;
use crate::mcp::protocol::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};
use super::protocol::*;
use super::session::{SessionManager, agent_event_to_update};
pub async fn run_acp_server(config: Config, client: OpenAiCompatibleProvider) -> Result<()> {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin);
let mut session_mgr = SessionManager::new(config, client);
let mut line = String::new();
tracing::info!("ACP server started on stdio");
loop {
line.clear();
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
tracing::info!("ACP server: stdin closed, shutting down");
break;
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let request: JsonRpcRequest = match serde_json::from_str(trimmed) {
Ok(r) => r,
Err(e) => {
let err_resp = make_error_response(0, -32700, &format!("Parse error: {}", e));
write_response(&mut stdout, &err_resp).await?;
continue;
}
};
tracing::debug!(method = %request.method, id = request.id, "ACP request");
let response = handle_request(&mut session_mgr, &mut stdout, &request).await;
write_response(&mut stdout, &response).await?;
if request.method == METHOD_SESSION_PROMPT
&& let Some(params) = &request.params
&& let Ok(prompt_params) = serde_json::from_value::<SessionPromptParams>(params.clone())
{
stream_events(&mut session_mgr, &mut stdout, &prompt_params.session_id).await?;
}
}
Ok(())
}
async fn handle_request(
session_mgr: &mut SessionManager,
_stdout: &mut tokio::io::Stdout,
request: &JsonRpcRequest,
) -> JsonRpcResponse {
let id = request.id;
let params = request.params.clone().unwrap_or(json!({}));
match request.method.as_str() {
METHOD_SESSION_INITIALIZE => {
let _params: InitializeParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return make_error_response(id, -32602, &format!("Invalid params: {}", e));
}
};
let result = InitializeResult {
name: "collet".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
capabilities: AcpCapabilities::default(),
};
make_success_response(id, serde_json::to_value(result).unwrap())
}
METHOD_SESSION_NEW => {
let params: SessionNewParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return make_error_response(id, -32602, &format!("Invalid params: {}", e));
}
};
match session_mgr.create_session(params).await {
Ok(session_id) => {
let result = SessionNewResult { session_id };
make_success_response(id, serde_json::to_value(result).unwrap())
}
Err(e) => {
make_error_response(id, -32000, &format!("Session creation failed: {}", e))
}
}
}
METHOD_SESSION_PROMPT => {
let params: SessionPromptParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return make_error_response(id, -32602, &format!("Invalid params: {}", e));
}
};
match session_mgr.send_prompt(¶ms.session_id, params.text, params.mode) {
Ok(()) => {
let result = SessionPromptResult { accepted: true };
make_success_response(id, serde_json::to_value(result).unwrap())
}
Err(e) => make_error_response(id, -32000, &format!("Prompt failed: {}", e)),
}
}
METHOD_SESSION_CANCEL => {
let params: SessionCancelParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return make_error_response(id, -32602, &format!("Invalid params: {}", e));
}
};
let cancelled = session_mgr.cancel_session(¶ms.session_id);
let result = SessionCancelResult { cancelled };
make_success_response(id, serde_json::to_value(result).unwrap())
}
METHOD_SESSION_CLOSE => {
let params: SessionCloseParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return make_error_response(id, -32602, &format!("Invalid params: {}", e));
}
};
let closed = session_mgr.remove_session(¶ms.session_id).is_some();
let result = SessionCloseResult { closed };
make_success_response(id, serde_json::to_value(result).unwrap())
}
_ => make_error_response(id, -32601, &format!("Method not found: {}", request.method)),
}
}
async fn stream_events(
session_mgr: &mut SessionManager,
stdout: &mut tokio::io::Stdout,
session_id: &str,
) -> Result<()> {
let session = match session_mgr.get_session_mut(session_id) {
Some(s) => s,
None => return Ok(()),
};
let sid = session.session_id.clone();
loop {
match session.event_rx.recv().await {
Some(event) => {
let is_done = matches!(&event, crate::agent::r#loop::AgentEvent::Done { .. });
let returned_context =
if let crate::agent::r#loop::AgentEvent::Done { context, .. } = &event {
Some(context.clone())
} else {
None
};
if let Some(payload) = agent_event_to_update(&event) {
let notif = SessionUpdateNotification {
session_id: sid.clone(),
payload,
};
let rpc_notif = JsonRpcNotification::new(
METHOD_SESSION_UPDATE,
serde_json::to_value(¬if).unwrap(),
);
write_notification(stdout, &rpc_notif).await?;
}
if is_done {
if let Some(ctx) = returned_context {
session_mgr.return_context(&sid, ctx);
}
break;
}
}
None => {
let notif = SessionUpdateNotification {
session_id: sid.clone(),
payload: UpdatePayload::Done,
};
let rpc_notif = JsonRpcNotification::new(
METHOD_SESSION_UPDATE,
serde_json::to_value(¬if).unwrap(),
);
write_notification(stdout, &rpc_notif).await?;
break;
}
}
}
Ok(())
}
fn make_success_response(id: u64, result: serde_json::Value) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(result),
error: None,
}
}
fn make_error_response(id: u64, code: i64, message: &str) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id,
result: None,
error: Some(JsonRpcError {
code,
message: message.to_string(),
data: None,
}),
}
}
async fn write_response(stdout: &mut tokio::io::Stdout, response: &JsonRpcResponse) -> Result<()> {
let mut payload = serde_json::to_string(response)?;
payload.push('\n');
stdout.write_all(payload.as_bytes()).await?;
stdout.flush().await?;
Ok(())
}
async fn write_notification(
stdout: &mut tokio::io::Stdout,
notification: &JsonRpcNotification,
) -> Result<()> {
let mut payload = serde_json::to_string(notification)?;
payload.push('\n');
stdout.write_all(payload.as_bytes()).await?;
stdout.flush().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_make_success_response() {
let resp = make_success_response(1, json!({"session_id": "acp-1"}));
assert_eq!(resp.id, 1);
assert!(resp.result.is_some());
assert!(resp.error.is_none());
}
#[test]
fn test_make_error_response() {
let resp = make_error_response(2, -32601, "Method not found");
assert_eq!(resp.id, 2);
assert!(resp.result.is_none());
let err = resp.error.unwrap();
assert_eq!(err.code, -32601);
}
}