Skip to main content

construct/channels/
acp_server.rs

1//! ACP (Agent Control Protocol) Server — JSON-RPC 2.0 over stdio.
2//!
3//! Provides an IDE-friendly interface for spawning and managing isolated agent
4//! sessions. Each session wraps an [`Agent`] built from the global config with
5//! streaming support via JSON-RPC notifications.
6//!
7//! ## Protocol
8//!
9//! Requests and responses are newline-delimited JSON objects on stdin/stdout.
10//!
11//! | Method            | Description                              |
12//! |-------------------|------------------------------------------|
13//! | `initialize`      | Handshake — returns server capabilities  |
14//! | `session/new`     | Create an isolated agent session          |
15//! | `session/prompt`  | Send a prompt, stream back events         |
16//! | `session/stop`    | Gracefully terminate a session            |
17
18use crate::agent::agent::{Agent, TurnEvent};
19use crate::config::Config;
20use anyhow::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::Mutex;
28use tracing::{debug, error, info, warn};
29use uuid::Uuid;
30
31// ── Configuration ────────────────────────────────────────────────
32
33/// ACP server configuration (optional `[acp]` section in config.toml).
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(default)]
36pub struct AcpServerConfig {
37    /// Maximum number of concurrent sessions. Default: 10.
38    pub max_sessions: usize,
39    /// Session inactivity timeout in seconds. Default: 3600 (1 hour).
40    pub session_timeout_secs: u64,
41}
42
43impl Default for AcpServerConfig {
44    fn default() -> Self {
45        Self {
46            max_sessions: 10,
47            session_timeout_secs: 3600,
48        }
49    }
50}
51
52// ── JSON-RPC types ───────────────────────────────────────────────
53
54#[derive(Debug, Deserialize)]
55struct JsonRpcRequest {
56    jsonrpc: String,
57    method: String,
58    #[serde(default)]
59    params: Value,
60    id: Option<Value>,
61}
62
63#[derive(Debug, Serialize)]
64struct JsonRpcResponse {
65    jsonrpc: &'static str,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    result: Option<Value>,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    error: Option<JsonRpcError>,
70    id: Value,
71}
72
73#[derive(Debug, Serialize)]
74struct JsonRpcNotification {
75    jsonrpc: &'static str,
76    method: &'static str,
77    params: Value,
78}
79
80#[derive(Debug, Serialize)]
81struct JsonRpcError {
82    code: i32,
83    message: String,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    data: Option<Value>,
86}
87
88// Standard JSON-RPC error codes
89const PARSE_ERROR: i32 = -32700;
90const INVALID_REQUEST: i32 = -32600;
91const METHOD_NOT_FOUND: i32 = -32601;
92const INVALID_PARAMS: i32 = -32602;
93const INTERNAL_ERROR: i32 = -32603;
94
95// Custom error codes
96const SESSION_NOT_FOUND: i32 = -32000;
97const SESSION_LIMIT_REACHED: i32 = -32001;
98
99// ── Session state ────────────────────────────────────────────────
100
101struct Session {
102    agent: Agent,
103    created_at: Instant,
104    last_active: Instant,
105    workspace_dir: String,
106}
107
108// ── ACP Server ───────────────────────────────────────────────────
109
110pub struct AcpServer {
111    config: Config,
112    acp_config: AcpServerConfig,
113    sessions: Arc<Mutex<HashMap<String, Session>>>,
114}
115
116impl AcpServer {
117    pub fn new(config: Config, acp_config: AcpServerConfig) -> Self {
118        Self {
119            config,
120            acp_config,
121            sessions: Arc::new(Mutex::new(HashMap::new())),
122        }
123    }
124
125    /// Run the ACP server, reading JSON-RPC requests from stdin and writing
126    /// responses/notifications to stdout.
127    pub async fn run(&self) -> Result<()> {
128        info!(
129            "ACP server starting (max_sessions={}, timeout={}s)",
130            self.acp_config.max_sessions, self.acp_config.session_timeout_secs
131        );
132
133        let stdin = tokio::io::stdin();
134        let mut reader = BufReader::new(stdin);
135        let mut line = String::new();
136
137        // Spawn session reaper
138        let sessions = Arc::clone(&self.sessions);
139        let timeout = Duration::from_secs(self.acp_config.session_timeout_secs);
140        tokio::spawn(async move {
141            let mut interval = tokio::time::interval(Duration::from_secs(60));
142            loop {
143                interval.tick().await;
144                let mut sessions = sessions.lock().await;
145                let before = sessions.len();
146                sessions.retain(|id, session| {
147                    let expired = session.last_active.elapsed() > timeout;
148                    if expired {
149                        info!("Session {id} expired after inactivity");
150                    }
151                    !expired
152                });
153                let reaped = before - sessions.len();
154                if reaped > 0 {
155                    debug!("Reaped {reaped} expired session(s)");
156                }
157            }
158        });
159
160        loop {
161            line.clear();
162            let bytes_read = reader.read_line(&mut line).await?;
163            if bytes_read == 0 {
164                info!("ACP server: stdin closed, shutting down");
165                break;
166            }
167
168            let trimmed = line.trim();
169            if trimmed.is_empty() {
170                continue;
171            }
172
173            match serde_json::from_str::<JsonRpcRequest>(trimmed) {
174                Ok(request) => {
175                    if request.jsonrpc != "2.0" {
176                        if let Some(id) = request.id {
177                            self.write_error(id, INVALID_REQUEST, "Invalid JSON-RPC version")
178                                .await;
179                        }
180                        continue;
181                    }
182                    self.handle_request(request).await;
183                }
184                Err(e) => {
185                    warn!("Failed to parse JSON-RPC request: {e}");
186                    self.write_error(Value::Null, PARSE_ERROR, &format!("Parse error: {e}"))
187                        .await;
188                }
189            }
190        }
191
192        Ok(())
193    }
194
195    async fn handle_request(&self, request: JsonRpcRequest) {
196        let id = request.id.clone().unwrap_or(Value::Null);
197        let is_notification = request.id.is_none();
198
199        let result = match request.method.as_str() {
200            "initialize" => self.handle_initialize(&request.params),
201            "session/new" => self.handle_session_new(&request.params).await,
202            "session/prompt" => self.handle_session_prompt(&request.params, &id).await,
203            "session/stop" => self.handle_session_stop(&request.params).await,
204            _ => Err(RpcError {
205                code: METHOD_NOT_FOUND,
206                message: format!("Method not found: {}", request.method),
207                data: None,
208            }),
209        };
210
211        // Only send response for requests (with id), not notifications
212        if !is_notification {
213            match result {
214                Ok(value) => self.write_result(id, value).await,
215                Err(e) => self.write_error(id, e.code, &e.message).await,
216            }
217        }
218    }
219
220    // ── Method handlers ──────────────────────────────────────────
221
222    fn handle_initialize(&self, _params: &Value) -> RpcResult {
223        Ok(serde_json::json!({
224            "protocolVersion": "1.0",
225            "serverInfo": {
226                "name": "construct-acp",
227                "version": env!("CARGO_PKG_VERSION"),
228            },
229            "capabilities": {
230                "streaming": true,
231                "maxSessions": self.acp_config.max_sessions,
232                "sessionTimeoutSecs": self.acp_config.session_timeout_secs,
233            },
234            "methods": [
235                "initialize",
236                "session/new",
237                "session/prompt",
238                "session/stop",
239            ],
240        }))
241    }
242
243    async fn handle_session_new(&self, params: &Value) -> RpcResult {
244        let mut sessions = self.sessions.lock().await;
245
246        if sessions.len() >= self.acp_config.max_sessions {
247            return Err(RpcError {
248                code: SESSION_LIMIT_REACHED,
249                message: format!(
250                    "Maximum session limit reached ({})",
251                    self.acp_config.max_sessions
252                ),
253                data: None,
254            });
255        }
256
257        let workspace_dir = params
258            .get("cwd")
259            .or_else(|| params.get("workspaceDir"))
260            .or_else(|| params.get("workspace_dir"))
261            .and_then(|v| v.as_str())
262            .unwrap_or_else(|| self.config.workspace_dir.to_str().unwrap_or("."))
263            .to_string();
264
265        let session_id = Uuid::new_v4().to_string();
266
267        // Build agent from global config
268        let agent = Agent::from_config(&self.config)
269            .await
270            .map_err(|e| RpcError {
271                code: INTERNAL_ERROR,
272                message: format!("Failed to create agent: {e}"),
273                data: None,
274            })?;
275
276        let now = Instant::now();
277        sessions.insert(
278            session_id.clone(),
279            Session {
280                agent,
281                created_at: now,
282                last_active: now,
283                workspace_dir: workspace_dir.clone(),
284            },
285        );
286
287        info!("Created session {session_id} (workspace: {workspace_dir})");
288
289        Ok(serde_json::json!({
290            "sessionId": session_id,
291            "workspaceDir": workspace_dir,
292        }))
293    }
294
295    async fn handle_session_prompt(&self, params: &Value, _request_id: &Value) -> RpcResult {
296        let session_id = params
297            .get("sessionId")
298            .or_else(|| params.get("session_id"))
299            .and_then(|v| v.as_str())
300            .ok_or_else(|| RpcError {
301                code: INVALID_PARAMS,
302                message: "Missing required parameter: sessionId".to_string(),
303                data: None,
304            })?
305            .to_string();
306
307        let prompt = params
308            .get("prompt")
309            .and_then(|v| v.as_str())
310            .ok_or_else(|| RpcError {
311                code: INVALID_PARAMS,
312                message: "Missing required parameter: prompt".to_string(),
313                data: None,
314            })?
315            .to_string();
316
317        // Remove the session from the map so we can take mutable ownership of
318        // the Agent for the duration of the turn. It will be reinserted after.
319        let mut session = {
320            let mut sessions = self.sessions.lock().await;
321            sessions.remove(&session_id).ok_or_else(|| RpcError {
322                code: SESSION_NOT_FOUND,
323                message: format!("Session not found: {session_id}"),
324                data: None,
325            })?
326        };
327
328        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<TurnEvent>(100);
329
330        let sessions_ref = Arc::clone(&self.sessions);
331        let sid = session_id.clone();
332
333        // Run turn_streamed in a spawned task. The task takes ownership of
334        // the whole Session and returns it alongside the result so we can
335        // put the session back into the map afterwards.
336        let turn_handle = tokio::spawn(async move {
337            let result = session.agent.turn_streamed(&prompt, event_tx).await;
338            (session, result)
339        });
340
341        // Forward events as they arrive
342        while let Some(event) = event_rx.recv().await {
343            let notification = match &event {
344                TurnEvent::Chunk { delta } => JsonRpcNotification {
345                    jsonrpc: "2.0",
346                    method: "session/event",
347                    params: serde_json::json!({
348                        "sessionId": session_id,
349                        "type": "chunk",
350                        "content": delta,
351                    }),
352                },
353                TurnEvent::ToolCall { name, args } => JsonRpcNotification {
354                    jsonrpc: "2.0",
355                    method: "session/event",
356                    params: serde_json::json!({
357                        "sessionId": session_id,
358                        "type": "tool_call",
359                        "name": name,
360                        "args": args,
361                    }),
362                },
363                TurnEvent::ToolResult { name, output } => JsonRpcNotification {
364                    jsonrpc: "2.0",
365                    method: "session/event",
366                    params: serde_json::json!({
367                        "sessionId": session_id,
368                        "type": "tool_result",
369                        "name": name,
370                        "output": output,
371                    }),
372                },
373                TurnEvent::Thinking { delta } => JsonRpcNotification {
374                    jsonrpc: "2.0",
375                    method: "session/event",
376                    params: serde_json::json!({
377                        "sessionId": session_id,
378                        "type": "thinking",
379                        "content": delta,
380                    }),
381                },
382                TurnEvent::OperatorStatus { phase, detail } => JsonRpcNotification {
383                    jsonrpc: "2.0",
384                    method: "session/event",
385                    params: serde_json::json!({
386                        "sessionId": session_id,
387                        "type": "operator_status",
388                        "phase": phase,
389                        "detail": detail,
390                    }),
391                },
392            };
393            self.write_notification(&notification).await;
394        }
395
396        // Wait for the turn to complete and recover the session
397        let (mut session, turn_result) = turn_handle.await.map_err(|e| RpcError {
398            code: INTERNAL_ERROR,
399            message: format!("Agent task panicked: {e}"),
400            data: None,
401        })?;
402
403        let result = turn_result.map_err(|e| RpcError {
404            code: INTERNAL_ERROR,
405            message: format!("Agent turn failed: {e}"),
406            data: None,
407        })?;
408
409        // Put the session back
410        {
411            session.last_active = Instant::now();
412            let mut sessions = sessions_ref.lock().await;
413            sessions.insert(sid, session);
414        }
415
416        Ok(serde_json::json!({
417            "sessionId": session_id,
418            "content": result,
419        }))
420    }
421
422    async fn handle_session_stop(&self, params: &Value) -> RpcResult {
423        let session_id = params
424            .get("sessionId")
425            .or_else(|| params.get("session_id"))
426            .and_then(|v| v.as_str())
427            .ok_or_else(|| RpcError {
428                code: INVALID_PARAMS,
429                message: "Missing required parameter: sessionId".to_string(),
430                data: None,
431            })?;
432
433        let mut sessions = self.sessions.lock().await;
434        if sessions.remove(session_id).is_some() {
435            info!("Stopped session {session_id}");
436            Ok(serde_json::json!({
437                "sessionId": session_id,
438                "stopped": true,
439            }))
440        } else {
441            Err(RpcError {
442                code: SESSION_NOT_FOUND,
443                message: format!("Session not found: {session_id}"),
444                data: None,
445            })
446        }
447    }
448
449    // ── I/O helpers ──────────────────────────────────────────────
450
451    async fn write_result(&self, id: Value, result: Value) {
452        let response = JsonRpcResponse {
453            jsonrpc: "2.0",
454            result: Some(result),
455            error: None,
456            id,
457        };
458        self.write_json(&response).await;
459    }
460
461    async fn write_error(&self, id: Value, code: i32, message: &str) {
462        let response = JsonRpcResponse {
463            jsonrpc: "2.0",
464            result: None,
465            error: Some(JsonRpcError {
466                code,
467                message: message.to_string(),
468                data: None,
469            }),
470            id,
471        };
472        self.write_json(&response).await;
473    }
474
475    async fn write_notification(&self, notification: &JsonRpcNotification) {
476        self.write_json(notification).await;
477    }
478
479    async fn write_json<T: Serialize>(&self, value: &T) {
480        match serde_json::to_string(value) {
481            Ok(json) => {
482                let mut stdout = tokio::io::stdout();
483                // Write as a single line followed by newline
484                if let Err(e) = stdout.write_all(json.as_bytes()).await {
485                    error!("Failed to write to stdout: {e}");
486                    return;
487                }
488                if let Err(e) = stdout.write_all(b"\n").await {
489                    error!("Failed to write newline to stdout: {e}");
490                    return;
491                }
492                if let Err(e) = stdout.flush().await {
493                    error!("Failed to flush stdout: {e}");
494                }
495            }
496            Err(e) => {
497                error!("Failed to serialize JSON-RPC message: {e}");
498            }
499        }
500    }
501}
502
503// ── Error helper ─────────────────────────────────────────────────
504
505struct RpcError {
506    code: i32,
507    message: String,
508    data: Option<Value>,
509}
510
511type RpcResult = std::result::Result<Value, RpcError>;
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    #[test]
518    fn acp_server_config_defaults() {
519        let cfg = AcpServerConfig::default();
520        assert_eq!(cfg.max_sessions, 10);
521        assert_eq!(cfg.session_timeout_secs, 3600);
522    }
523
524    #[test]
525    fn acp_server_config_deserialize() {
526        let json = r#"{"max_sessions": 5, "session_timeout_secs": 1800}"#;
527        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
528        assert_eq!(cfg.max_sessions, 5);
529        assert_eq!(cfg.session_timeout_secs, 1800);
530    }
531
532    #[test]
533    fn acp_server_config_deserialize_partial() {
534        let json = r#"{"max_sessions": 3}"#;
535        let cfg: AcpServerConfig = serde_json::from_str(json).unwrap();
536        assert_eq!(cfg.max_sessions, 3);
537        assert_eq!(cfg.session_timeout_secs, 3600);
538    }
539
540    #[test]
541    fn json_rpc_request_parse() {
542        let json = r#"{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}"#;
543        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
544        assert_eq!(req.method, "initialize");
545        assert_eq!(req.id, Some(Value::Number(1.into())));
546    }
547
548    #[test]
549    fn json_rpc_request_parse_notification() {
550        let json = r#"{"jsonrpc":"2.0","method":"session/event","params":{}}"#;
551        let req: JsonRpcRequest = serde_json::from_str(json).unwrap();
552        assert_eq!(req.method, "session/event");
553        assert!(req.id.is_none());
554    }
555
556    #[test]
557    fn json_rpc_response_serialize() {
558        let resp = JsonRpcResponse {
559            jsonrpc: "2.0",
560            result: Some(serde_json::json!({"status": "ok"})),
561            error: None,
562            id: Value::Number(1.into()),
563        };
564        let json = serde_json::to_string(&resp).unwrap();
565        let parsed: Value = serde_json::from_str(&json).unwrap();
566        assert_eq!(parsed["jsonrpc"], "2.0");
567        assert!(parsed.get("result").is_some());
568        assert!(parsed.get("error").is_none());
569        assert_eq!(parsed["id"], 1);
570    }
571
572    #[test]
573    fn json_rpc_error_response_serialize() {
574        let resp = JsonRpcResponse {
575            jsonrpc: "2.0",
576            result: None,
577            error: Some(JsonRpcError {
578                code: METHOD_NOT_FOUND,
579                message: "Method not found".to_string(),
580                data: None,
581            }),
582            id: Value::Number(1.into()),
583        };
584        let json = serde_json::to_string(&resp).unwrap();
585        let parsed: Value = serde_json::from_str(&json).unwrap();
586        assert!(parsed.get("error").is_some());
587        assert_eq!(parsed["error"]["code"], -32601);
588        assert!(parsed.get("result").is_none());
589    }
590
591    #[test]
592    fn json_rpc_notification_serialize() {
593        let notif = JsonRpcNotification {
594            jsonrpc: "2.0",
595            method: "session/event",
596            params: serde_json::json!({"type": "chunk", "content": "hello"}),
597        };
598        let json = serde_json::to_string(&notif).unwrap();
599        assert!(json.contains(r#""method":"session/event""#));
600        assert!(json.contains(r#""content":"hello""#));
601    }
602}