1use crate::agent::agent::{Agent, TurnEvent};
19use crate::config::Config;
20use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::Mutex;
28use tracing::{debug, error, info, warn};
29use uuid::Uuid;
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(default)]
36pub struct AcpServerConfig {
37 pub max_sessions: usize,
39 pub session_timeout_secs: u64,
41}
42
43impl Default for AcpServerConfig {
44 fn default() -> Self {
45 Self {
46 max_sessions: 10,
47 session_timeout_secs: 3600,
48 }
49 }
50}
51
52#[derive(Debug, Deserialize)]
55struct JsonRpcRequest {
56 jsonrpc: String,
57 method: String,
58 #[serde(default)]
59 params: Value,
60 id: Option<Value>,
61}
62
63#[derive(Debug, Serialize)]
64struct JsonRpcResponse {
65 jsonrpc: &'static str,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 result: Option<Value>,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 error: Option<JsonRpcError>,
70 id: Value,
71}
72
73#[derive(Debug, Serialize)]
74struct JsonRpcNotification {
75 jsonrpc: &'static str,
76 method: &'static str,
77 params: Value,
78}
79
80#[derive(Debug, Serialize)]
81struct JsonRpcError {
82 code: i32,
83 message: String,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 data: Option<Value>,
86}
87
88const PARSE_ERROR: i32 = -32700;
90const INVALID_REQUEST: i32 = -32600;
91const METHOD_NOT_FOUND: i32 = -32601;
92const INVALID_PARAMS: i32 = -32602;
93const INTERNAL_ERROR: i32 = -32603;
94
95const SESSION_NOT_FOUND: i32 = -32000;
97const SESSION_LIMIT_REACHED: i32 = -32001;
98
99struct Session {
102 agent: Agent,
103 created_at: Instant,
104 last_active: Instant,
105 workspace_dir: String,
106}
107
108pub struct AcpServer {
111 config: Config,
112 acp_config: AcpServerConfig,
113 sessions: Arc<Mutex<HashMap<String, Session>>>,
114}
115
116impl AcpServer {
117 pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
118 Self {
119 config,
120 acp_config,
121 sessions: Arc::new(Mutex::new(HashMap::new())),
122 }
123 }
124
125 pub async fn run(&self) -> Result<()> {
128 info!(
129 "ACP server starting (max_sessions={}, timeout={}s)",
130 self.acp_config.max_sessions, self.acp_config.session_timeout_secs
131 );
132
133 let stdin = tokio::io::stdin();
134 let mut reader = BufReader::new(stdin);
135 let mut line = String::new();
136
137 let sessions = Arc::clone(&self.sessions);
139 let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
140 tokio::spawn(async move {
141 let mut interval = tokio::time::interval(Duration::from_secs(60));
142 loop {
143 interval.tick().await;
144 let mut sessions = sessions.lock().await;
145 let before = sessions.len();
146 sessions.retain(|id, session| {
147 let expired = session.last_active.elapsed() > timeout;
148 if expired {
149 info!("Session {id} expired after inactivity");
150 }
151 !expired
152 });
153 let reaped = before - sessions.len();
154 if reaped > 0 {
155 debug!("Reaped {reaped} expired session(s)");
156 }
157 }
158 });
159
160 loop {
161 line.clear();
162 let bytes_read = reader.read_line(&mut line).await?;
163 if bytes_read == 0 {
164 info!("ACP server: stdin closed, shutting down");
165 break;
166 }
167
168 let trimmed = line.trim();
169 if trimmed.is_empty() {
170 continue;
171 }
172
173 match serde_json::from_str::<JsonRpcRequest>(trimmed) {
174 Ok(request) => {
175 if request.jsonrpc != "2.0" {
176 if let Some(id) = request.id {
177 self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
178 .await;
179 }
180 continue;
181 }
182 self.handle_request(request).await;
183 }
184 Err(e) => {
185 warn!("Failed to parse JSON-RPC request: {e}");
186 self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
187 .await;
188 }
189 }
190 }
191
192 Ok(())
193 }
194
195 async fn handle_request(&self, request: JsonRpcRequest) {
196 let id = request.id.clone().unwrap_or(Value::Null);
197 let is_notification = request.id.is_none();
198
199 let result = match request.method.as_str() {
200 "initialize" => self.handle_initialize(&request.params),
201 "session/new" => self.handle_session_new(&request.params).await,
202 "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
203 "session/stop" => self.handle_session_stop(&request.params).await,
204 _ => Err(RpcError {
205 code: METHOD_NOT_FOUND,
206 message: format!("Method not found: {}", request.method),
207 data: None,
208 }),
209 };
210
211 if !is_notification {
213 match result {
214 Ok(value) => self.write_result(id, value).await,
215 Err(e) => self.write_error(id, e.code, &e.message).await,
216 }
217 }
218 }
219
220 fn handle_initialize(&self, _params: &Value) -> RpcResult {
223 Ok(serde_json::json!({
224 "protocolVersion": "1.0",
225 "serverInfo": {
226 "name": "construct-acp",
227 "version": env!("CARGO_PKG_VERSION"),
228 },
229 "capabilities": {
230 "streaming": true,
231 "maxSessions": self.acp_config.max_sessions,
232 "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
233 },
234 "methods": [
235 "initialize",
236 "session/new",
237 "session/prompt",
238 "session/stop",
239 ],
240 }))
241 }
242
243 async fn handle_session_new(&self, params: &Value) -> RpcResult {
244 let mut sessions = self.sessions.lock().await;
245
246 if sessions.len() >= self.acp_config.max_sessions {
247 return Err(RpcError {
248 code: SESSION_LIMIT_REACHED,
249 message: format!(
250 "Maximum session limit reached ({})",
251 self.acp_config.max_sessions
252 ),
253 data: None,
254 });
255 }
256
257 let workspace_dir = params
258 .get("cwd")
259 .or_else(|| params.get("workspaceDir"))
260 .or_else(|| params.get("workspace_dir"))
261 .and_then(|v| v.as_str())
262 .unwrap_or_else(|| self.config.workspace_dir.to_str().unwrap_or("."))
263 .to_string();
264
265 let session_id = Uuid::new_v4().to_string();
266
267 let agent = Agent::from_config(&self.config)
269 .await
270 .map_err(|e| RpcError {
271 code: INTERNAL_ERROR,
272 message: format!("Failed to create agent: {e}"),
273 data: None,
274 })?;
275
276 let now = Instant::now();
277 sessions.insert(
278 session_id.clone(),
279 Session {
280 agent,
281 created_at: now,
282 last_active: now,
283 workspace_dir: workspace_dir.clone(),
284 },
285 );
286
287 info!("Created session {session_id} (workspace: {workspace_dir})");
288
289 Ok(serde_json::json!({
290 "sessionId": session_id,
291 "workspaceDir": workspace_dir,
292 }))
293 }
294
295 async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
296 let session_id = params
297 .get("sessionId")
298 .or_else(|| params.get("session_id"))
299 .and_then(|v| v.as_str())
300 .ok_or_else(|| RpcError {
301 code: INVALID_PARAMS,
302 message: "Missing required parameter: sessionId".to_string(),
303 data: None,
304 })?
305 .to_string();
306
307 let prompt = params
308 .get("prompt")
309 .and_then(|v| v.as_str())
310 .ok_or_else(|| RpcError {
311 code: INVALID_PARAMS,
312 message: "Missing required parameter: prompt".to_string(),
313 data: None,
314 })?
315 .to_string();
316
317 let mut session = {
320 let mut sessions = self.sessions.lock().await;
321 sessions.remove(&session_id).ok_or_else(|| RpcError {
322 code: SESSION_NOT_FOUND,
323 message: format!("Session not found: {session_id}"),
324 data: None,
325 })?
326 };
327
328 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
329
330 let sessions_ref = Arc::clone(&self.sessions);
331 let sid = session_id.clone();
332
333 let turn_handle = tokio::spawn(async move {
337 let result = session.agent.turn_streamed(&prompt, event_tx).await;
338 (session, result)
339 });
340
341 while let Some(event) = event_rx.recv().await {
343 let notification = match &event {
344 TurnEvent::Chunk { delta } => JsonRpcNotification {
345 jsonrpc: "2.0",
346 method: "session/event",
347 params: serde_json::json!({
348 "sessionId": session_id,
349 "type": "chunk",
350 "content": delta,
351 }),
352 },
353 TurnEvent::ToolCall { name, args } => JsonRpcNotification {
354 jsonrpc: "2.0",
355 method: "session/event",
356 params: serde_json::json!({
357 "sessionId": session_id,
358 "type": "tool_call",
359 "name": name,
360 "args": args,
361 }),
362 },
363 TurnEvent::ToolResult { name, output } => JsonRpcNotification {
364 jsonrpc: "2.0",
365 method: "session/event",
366 params: serde_json::json!({
367 "sessionId": session_id,
368 "type": "tool_result",
369 "name": name,
370 "output": output,
371 }),
372 },
373 TurnEvent::Thinking { delta } => JsonRpcNotification {
374 jsonrpc: "2.0",
375 method: "session/event",
376 params: serde_json::json!({
377 "sessionId": session_id,
378 "type": "thinking",
379 "content": delta,
380 }),
381 },
382 TurnEvent::OperatorStatus { phase, detail } => JsonRpcNotification {
383 jsonrpc: "2.0",
384 method: "session/event",
385 params: serde_json::json!({
386 "sessionId": session_id,
387 "type": "operator_status",
388 "phase": phase,
389 "detail": detail,
390 }),
391 },
392 };
393 self.write_notification(¬ification).await;
394 }
395
396 let (mut session, turn_result) = turn_handle.await.map_err(|e| RpcError {
398 code: INTERNAL_ERROR,
399 message: format!("Agent task panicked: {e}"),
400 data: None,
401 })?;
402
403 let result = turn_result.map_err(|e| RpcError {
404 code: INTERNAL_ERROR,
405 message: format!("Agent turn failed: {e}"),
406 data: None,
407 })?;
408
409 {
411 session.last_active = Instant::now();
412 let mut sessions = sessions_ref.lock().await;
413 sessions.insert(sid, session);
414 }
415
416 Ok(serde_json::json!({
417 "sessionId": session_id,
418 "content": result,
419 }))
420 }
421
422 async fn handle_session_stop(&self, params: &Value) -> RpcResult {
423 let session_id = params
424 .get("sessionId")
425 .or_else(|| params.get("session_id"))
426 .and_then(|v| v.as_str())
427 .ok_or_else(|| RpcError {
428 code: INVALID_PARAMS,
429 message: "Missing required parameter: sessionId".to_string(),
430 data: None,
431 })?;
432
433 let mut sessions = self.sessions.lock().await;
434 if sessions.remove(session_id).is_some() {
435 info!("Stopped session {session_id}");
436 Ok(serde_json::json!({
437 "sessionId": session_id,
438 "stopped": true,
439 }))
440 } else {
441 Err(RpcError {
442 code: SESSION_NOT_FOUND,
443 message: format!("Session not found: {session_id}"),
444 data: None,
445 })
446 }
447 }
448
449 async fn write_result(&self, id: Value, result: Value) {
452 let response = JsonRpcResponse {
453 jsonrpc: "2.0",
454 result: Some(result),
455 error: None,
456 id,
457 };
458 self.write_json(&response).await;
459 }
460
461 async fn write_error(&self, id: Value, code: i32, message: &str) {
462 let response = JsonRpcResponse {
463 jsonrpc: "2.0",
464 result: None,
465 error: Some(JsonRpcError {
466 code,
467 message: message.to_string(),
468 data: None,
469 }),
470 id,
471 };
472 self.write_json(&response).await;
473 }
474
475 async fn write_notification(&self, notification: &JsonRpcNotification) {
476 self.write_json(notification).await;
477 }
478
479 async fn write_json<T: Serialize>(&self, value: &T) {
480 match serde_json::to_string(value) {
481 Ok(json) => {
482 let mut stdout = tokio::io::stdout();
483 if let Err(e) = stdout.write_all(json.as_bytes()).await {
485 error!("Failed to write to stdout: {e}");
486 return;
487 }
488 if let Err(e) = stdout.write_all(b"\n").await {
489 error!("Failed to write newline to stdout: {e}");
490 return;
491 }
492 if let Err(e) = stdout.flush().await {
493 error!("Failed to flush stdout: {e}");
494 }
495 }
496 Err(e) => {
497 error!("Failed to serialize JSON-RPC message: {e}");
498 }
499 }
500 }
501}
502
503struct RpcError {
506 code: i32,
507 message: String,
508 data: Option<Value>,
509}
510
511type RpcResult = std::result::Result<Value, RpcError>;
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[test]
518 fn acp_server_config_defaults() {
519 let cfg = AcpServerConfig::default();
520 assert_eq!(cfg.max_sessions, 10);
521 assert_eq!(cfg.session_timeout_secs, 3600);
522 }
523
524 #[test]
525 fn acp_server_config_deserialize() {
526 let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
527 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
528 assert_eq!(cfg.max_sessions, 5);
529 assert_eq!(cfg.session_timeout_secs, 1800);
530 }
531
532 #[test]
533 fn acp_server_config_deserialize_partial() {
534 let json = r#"{"max_sessions": 3}"#;
535 let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
536 assert_eq!(cfg.max_sessions, 3);
537 assert_eq!(cfg.session_timeout_secs, 3600);
538 }
539
540 #[test]
541 fn json_rpc_request_parse() {
542 let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
543 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
544 assert_eq!(req.method, "initialize");
545 assert_eq!(req.id, Some(Value::Number(1.into())));
546 }
547
548 #[test]
549 fn json_rpc_request_parse_notification() {
550 let json = r#"{"jsonrpc":"2.0","method":"session/event","params":{}}"#;
551 let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
552 assert_eq!(req.method, "session/event");
553 assert!(req.id.is_none());
554 }
555
556 #[test]
557 fn json_rpc_response_serialize() {
558 let resp = JsonRpcResponse {
559 jsonrpc: "2.0",
560 result: Some(serde_json::json!({"status": "ok"})),
561 error: None,
562 id: Value::Number(1.into()),
563 };
564 let json = serde_json::to_string(&resp).unwrap();
565 let parsed: Value = serde_json::from_str(&json).unwrap();
566 assert_eq!(parsed["jsonrpc"], "2.0");
567 assert!(parsed.get("result").is_some());
568 assert!(parsed.get("error").is_none());
569 assert_eq!(parsed["id"], 1);
570 }
571
572 #[test]
573 fn json_rpc_error_response_serialize() {
574 let resp = JsonRpcResponse {
575 jsonrpc: "2.0",
576 result: None,
577 error: Some(JsonRpcError {
578 code: METHOD_NOT_FOUND,
579 message: "Method not found".to_string(),
580 data: None,
581 }),
582 id: Value::Number(1.into()),
583 };
584 let json = serde_json::to_string(&resp).unwrap();
585 let parsed: Value = serde_json::from_str(&json).unwrap();
586 assert!(parsed.get("error").is_some());
587 assert_eq!(parsed["error"]["code"], -32601);
588 assert!(parsed.get("result").is_none());
589 }
590
591 #[test]
592 fn json_rpc_notification_serialize() {
593 let notif = JsonRpcNotification {
594 jsonrpc: "2.0",
595 method: "session/event",
596 params: serde_json::json!({"type": "chunk", "content": "hello"}),
597 };
598 let json = serde_json::to_string(¬if).unwrap();
599 assert!(json.contains(r#""method":"session/event""#));
600 assert!(json.contains(r#""content":"hello""#));
601 }
602}