use std::io::{self, BufRead, Write};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use agent_code_lib::query::{QueryEngine, StreamSink};
#[derive(Debug, Deserialize)]
struct JsonRpcRequest {
#[allow(dead_code)]
jsonrpc: String,
id: Option<serde_json::Value>,
method: String,
#[serde(default)]
params: serde_json::Value,
}
#[derive(Debug, Serialize)]
struct JsonRpcResponse {
jsonrpc: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize)]
struct JsonRpcError {
code: i32,
message: String,
}
impl JsonRpcResponse {
fn success(id: Option<serde_json::Value>, result: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
fn error(id: Option<serde_json::Value>, code: i32, message: String) -> Self {
Self {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcError { code, message }),
}
}
}
struct AcpSink {
notify_tx: tokio::sync::mpsc::UnboundedSender<String>,
text: std::sync::Mutex<String>,
tools: std::sync::Mutex<Vec<String>>,
}
impl AcpSink {
fn new(notify_tx: tokio::sync::mpsc::UnboundedSender<String>) -> Self {
Self {
notify_tx,
text: std::sync::Mutex::new(String::new()),
tools: std::sync::Mutex::new(Vec::new()),
}
}
fn notify(&self, method: &str, params: serde_json::Value) {
let msg = serde_json::json!({
"jsonrpc": "2.0",
"method": method,
"params": params,
});
let _ = self
.notify_tx
.send(serde_json::to_string(&msg).unwrap_or_default());
}
}
impl StreamSink for AcpSink {
fn on_text(&self, text: &str) {
if let Ok(mut t) = self.text.lock() {
t.push_str(text);
}
self.notify("events/text_delta", serde_json::json!({ "text": text }));
}
fn on_tool_start(&self, name: &str, _input: &serde_json::Value) {
if let Ok(mut tools) = self.tools.lock()
&& !tools.contains(&name.to_string())
{
tools.push(name.to_string());
}
self.notify("events/tool_start", serde_json::json!({ "name": name }));
}
fn on_tool_result(&self, name: &str, result: &agent_code_lib::tools::ToolResult) {
self.notify(
"events/tool_result",
serde_json::json!({
"name": name,
"is_error": result.is_error,
}),
);
}
fn on_thinking(&self, text: &str) {
self.notify("events/thinking", serde_json::json!({ "text": text }));
}
fn on_turn_complete(&self, turn: usize) {
self.notify("events/turn_complete", serde_json::json!({ "turn": turn }));
}
fn on_error(&self, error: &str) {
if let Ok(mut t) = self.text.lock() {
t.push_str(&format!("\n[Error: {error}]"));
}
self.notify("events/error", serde_json::json!({ "message": error }));
}
fn on_usage(&self, usage: &agent_code_lib::llm::message::Usage) {
self.notify(
"events/usage",
serde_json::json!({
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
}),
);
}
fn on_compact(&self, freed_tokens: u64) {
self.notify(
"events/compact",
serde_json::json!({ "freed_tokens": freed_tokens }),
);
}
fn on_warning(&self, msg: &str) {
self.notify("events/warning", serde_json::json!({ "message": msg }));
}
}
fn handle_initialize(id: Option<serde_json::Value>) -> JsonRpcResponse {
JsonRpcResponse::success(
id,
serde_json::json!({
"name": "agent-code",
"version": env!("CARGO_PKG_VERSION"),
"protocol_version": "1",
"capabilities": {
"streaming": true,
"tools": true,
"thinking": true,
}
}),
)
}
async fn handle_message(
id: Option<serde_json::Value>,
params: &serde_json::Value,
engine: &Arc<Mutex<QueryEngine>>,
notify_tx: &tokio::sync::mpsc::UnboundedSender<String>,
) -> JsonRpcResponse {
let content = match params.get("content").and_then(|v| v.as_str()) {
Some(c) => c.to_string(),
None => {
return JsonRpcResponse::error(
id,
-32602,
"Invalid params: missing 'content' string".to_string(),
);
}
};
let sink = Arc::new(AcpSink::new(notify_tx.clone()));
let sink_ref: &dyn StreamSink = &*sink;
let mut engine = engine.lock().await;
let turn_result = engine.run_turn_with_sink(&content, sink_ref).await;
let response_text = sink.text.lock().map(|t| t.clone()).unwrap_or_default();
let tools_used = sink.tools.lock().map(|t| t.clone()).unwrap_or_default();
let state = engine.state();
let turn_count = state.turn_count;
let cost_usd = state.total_cost_usd;
if let Err(e) = turn_result {
return JsonRpcResponse::error(id, -32000, format!("Turn failed: {e}"));
}
JsonRpcResponse::success(
id,
serde_json::json!({
"response": response_text,
"turn_count": turn_count,
"tools_used": tools_used,
"cost_usd": cost_usd,
}),
)
}
async fn handle_status(
id: Option<serde_json::Value>,
engine: &Arc<Mutex<QueryEngine>>,
) -> JsonRpcResponse {
let engine = engine.lock().await;
let s = engine.state();
JsonRpcResponse::success(
id,
serde_json::json!({
"session_id": s.session_id,
"model": s.config.api.model,
"cwd": s.cwd,
"turn_count": s.turn_count,
"message_count": s.messages.len(),
"cost_usd": s.total_cost_usd,
"plan_mode": s.plan_mode,
}),
)
}
async fn handle_cancel(
id: Option<serde_json::Value>,
engine: &Arc<Mutex<QueryEngine>>,
) -> JsonRpcResponse {
let engine = engine.lock().await;
engine.cancel();
JsonRpcResponse::success(id, serde_json::json!({ "cancelled": true }))
}
fn write_to_stdout(msg: &str) {
let stdout = io::stdout();
let mut out = stdout.lock();
let _ = out.write_all(msg.as_bytes());
let _ = out.write_all(b"\n");
let _ = out.flush();
}
fn write_response(resp: &JsonRpcResponse) {
let msg = serde_json::to_string(resp).unwrap_or_default();
write_to_stdout(&msg);
}
pub async fn run_acp(engine: QueryEngine) -> anyhow::Result<()> {
let engine = Arc::new(Mutex::new(engine));
let (req_tx, mut req_rx) = tokio::sync::mpsc::unbounded_channel::<JsonRpcRequest>();
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let req_tx_clone = req_tx.clone();
std::thread::spawn(move || {
let stdin = io::stdin();
for line in stdin.lock().lines() {
let line = match line {
Ok(l) => l,
Err(_) => break, };
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<JsonRpcRequest>(&line) {
Ok(req) => {
if req_tx_clone.send(req).is_err() {
break; }
}
Err(e) => {
let err = JsonRpcResponse::error(None, -32700, format!("Parse error: {e}"));
let msg = serde_json::to_string(&err).unwrap_or_default();
write_to_stdout(&msg);
}
}
}
});
tokio::spawn(async move {
while let Some(msg) = notify_rx.recv().await {
write_to_stdout(&msg);
}
});
while let Some(req) = req_rx.recv().await {
let response = match req.method.as_str() {
"initialize" => handle_initialize(req.id),
"message" => handle_message(req.id, &req.params, &engine, ¬ify_tx).await,
"status" => handle_status(req.id, &engine).await,
"cancel" => handle_cancel(req.id, &engine).await,
"shutdown" => {
let resp = JsonRpcResponse::success(req.id, serde_json::json!({ "ok": true }));
write_response(&resp);
break;
}
other => JsonRpcResponse::error(req.id, -32601, format!("Method not found: {other}")),
};
write_response(&response);
}
Ok(())
}