Skip to main content

claude_code_cli_acp/acp/
server.rs

1use std::{
2    collections::HashMap,
3    path::PathBuf,
4    sync::{Arc, Mutex},
5};
6
7use acp::schema::{
8    AgentCapabilities, AuthMethod, AuthMethodAgent, AuthMethodTerminal, AuthenticateRequest,
9    AuthenticateResponse, CancelNotification, ClientCapabilities, CloseSessionRequest,
10    CloseSessionResponse, Implementation, InitializeRequest, InitializeResponse,
11    ListSessionsRequest, ListSessionsResponse, LoadSessionRequest, LoadSessionResponse,
12    McpCapabilities, NewSessionRequest, NewSessionResponse, PromptCapabilities, PromptRequest,
13    PromptResponse, ProtocolVersion, SessionCapabilities, SessionCloseCapabilities, SessionId,
14    SessionInfo, SessionListCapabilities, SessionNotification, SetSessionConfigOptionRequest,
15    SetSessionConfigOptionResponse, SetSessionModeRequest, SetSessionModeResponse,
16    SetSessionModelRequest, SetSessionModelResponse, StopReason,
17};
18use acp::{Agent, Client, ConnectTo, ConnectionTo, Error};
19use agent_client_protocol as acp;
20use agent_client_protocol::ByteStreams;
21use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25use crate::{
26    acp::updates,
27    compat::claude_probe::ClaudeCli,
28    session::manager::{ManagedSession, SessionManager, TurnOptions},
29    transcript::tailer::TranscriptLocator,
30};
31
32const CLAUDE_CODE_LOGIN_AUTH_ID: &str = "claude-code-login";
33const CLAUDE_CODE_TERMINAL_AUTH_ID: &str = "claude-code-terminal-login";
34
35pub struct AcpServer {
36    client_capabilities: Arc<Mutex<ClientCapabilities>>,
37    sessions: Arc<Mutex<HashMap<SessionId, Arc<ManagedSession>>>>,
38    manager: SessionManager,
39}
40
41impl AcpServer {
42    pub fn new() -> Self {
43        Self {
44            client_capabilities: Arc::default(),
45            sessions: Arc::default(),
46            manager: SessionManager::new(),
47        }
48    }
49
50    pub async fn serve_stdio(self) -> std::io::Result<()> {
51        let stdin = tokio::io::stdin().compat();
52        let stdout = tokio::io::stdout().compat_write();
53        Arc::new(self)
54            .serve(ByteStreams::new(stdout, stdin))
55            .await
56            .map_err(|e| std::io::Error::other(format!("ACP error: {e}")))
57    }
58
59    pub async fn initialize_for_test(
60        &self,
61        client_capabilities: ClientCapabilities,
62    ) -> Result<InitializeResponse, Error> {
63        self.initialize(
64            InitializeRequest::new(ProtocolVersion::V1)
65                .client_capabilities(client_capabilities)
66                .client_info(Implementation::new("test-client", "0.0.0")),
67        )
68        .await
69    }
70
71    pub fn create_session_for_test(&self, cwd: PathBuf) -> anyhow::Result<SessionId> {
72        let session_id = SessionId::new(Uuid::new_v4().to_string());
73        let session = self
74            .manager
75            .create_session(session_id.clone(), cwd, Vec::new())?;
76        self.sessions
77            .lock()
78            .unwrap()
79            .insert(session_id.clone(), session);
80        Ok(session_id)
81    }
82
83    pub async fn serve(
84        self: Arc<Self>,
85        transport: impl ConnectTo<Agent> + 'static,
86    ) -> acp::Result<()> {
87        let server = self;
88        Agent
89            .builder()
90            .name("claude-code-cli-acp")
91            .on_receive_request(
92                {
93                    let server = server.clone();
94                    async move |request: InitializeRequest, responder, _cx| {
95                        responder.respond_with_result(server.initialize(request).await)
96                    }
97                },
98                acp::on_receive_request!(),
99            )
100            .on_receive_request(
101                {
102                    let server = server.clone();
103                    async move |request: AuthenticateRequest,
104                                responder,
105                                cx: ConnectionTo<Client>| {
106                        let server = server.clone();
107                        cx.spawn(async move {
108                            responder.respond_with_result(server.authenticate(request).await)
109                        })?;
110                        Ok(())
111                    }
112                },
113                acp::on_receive_request!(),
114            )
115            .on_receive_request(
116                {
117                    let server = server.clone();
118                    async move |request: NewSessionRequest, responder, cx: ConnectionTo<Client>| {
119                        let server = server.clone();
120                        let session_cx = cx.clone();
121                        cx.spawn(async move {
122                            responder
123                                .respond_with_result(server.new_session(request, session_cx).await)
124                        })?;
125                        Ok(())
126                    }
127                },
128                acp::on_receive_request!(),
129            )
130            .on_receive_request(
131                {
132                    let server = server.clone();
133                    async move |request: LoadSessionRequest, responder, cx: ConnectionTo<Client>| {
134                        let server = server.clone();
135                        let session_cx = cx.clone();
136                        cx.spawn(async move {
137                            responder
138                                .respond_with_result(server.load_session(request, session_cx).await)
139                        })?;
140                        Ok(())
141                    }
142                },
143                acp::on_receive_request!(),
144            )
145            .on_receive_request(
146                {
147                    let server = server.clone();
148                    async move |request: ListSessionsRequest,
149                                responder,
150                                cx: ConnectionTo<Client>| {
151                        let server = server.clone();
152                        cx.spawn(async move {
153                            responder.respond_with_result(server.list_sessions(request).await)
154                        })?;
155                        Ok(())
156                    }
157                },
158                acp::on_receive_request!(),
159            )
160            .on_receive_request(
161                {
162                    let server = server.clone();
163                    async move |request: CloseSessionRequest,
164                                responder,
165                                cx: ConnectionTo<Client>| {
166                        let server = server.clone();
167                        cx.spawn(async move {
168                            responder.respond_with_result(server.close_session(request).await)
169                        })?;
170                        Ok(())
171                    }
172                },
173                acp::on_receive_request!(),
174            )
175            .on_receive_request(
176                {
177                    let server = server.clone();
178                    async move |request: PromptRequest, responder, cx: ConnectionTo<Client>| {
179                        let server = server.clone();
180                        let prompt_cx = cx.clone();
181                        cx.spawn(async move {
182                            responder.respond_with_result(server.prompt(request, prompt_cx).await)
183                        })?;
184                        Ok(())
185                    }
186                },
187                acp::on_receive_request!(),
188            )
189            .on_receive_notification(
190                {
191                    let server = server.clone();
192                    async move |notification: CancelNotification, cx: ConnectionTo<Client>| {
193                        let server = server.clone();
194                        cx.spawn(async move {
195                            if let Err(err) = server.cancel(notification).await {
196                                warn!("failed to cancel session: {err:?}");
197                            }
198                            Ok(())
199                        })?;
200                        Ok(())
201                    }
202                },
203                acp::on_receive_notification!(),
204            )
205            .on_receive_request(
206                {
207                    let server = server.clone();
208                    async move |request: SetSessionModeRequest,
209                                responder,
210                                cx: ConnectionTo<Client>| {
211                        let server = server.clone();
212                        cx.spawn(async move {
213                            responder.respond_with_result(server.set_session_mode(request).await)
214                        })?;
215                        Ok(())
216                    }
217                },
218                acp::on_receive_request!(),
219            )
220            .on_receive_request(
221                {
222                    let server = server.clone();
223                    async move |request: SetSessionModelRequest,
224                                responder,
225                                cx: ConnectionTo<Client>| {
226                        let server = server.clone();
227                        cx.spawn(async move {
228                            responder.respond_with_result(server.set_session_model(request).await)
229                        })?;
230                        Ok(())
231                    }
232                },
233                acp::on_receive_request!(),
234            )
235            .on_receive_request(
236                {
237                    let server = server.clone();
238                    async move |request: SetSessionConfigOptionRequest,
239                                responder,
240                                cx: ConnectionTo<Client>| {
241                        let server = server.clone();
242                        cx.spawn(async move {
243                            responder.respond_with_result(
244                                server.set_session_config_option(request).await,
245                            )
246                        })?;
247                        Ok(())
248                    }
249                },
250                acp::on_receive_request!(),
251            )
252            .connect_to(transport)
253            .await
254    }
255
256    async fn initialize(&self, request: InitializeRequest) -> Result<InitializeResponse, Error> {
257        debug!(
258            "initialize requested with protocol version {:?}",
259            request.protocol_version
260        );
261        let client_capabilities = request.client_capabilities;
262        *self.client_capabilities.lock().unwrap() = client_capabilities.clone();
263
264        let mut agent_capabilities = AgentCapabilities::new()
265            .prompt_capabilities(
266                PromptCapabilities::new()
267                    .embedded_context(true)
268                    .image(false),
269            )
270            .mcp_capabilities(McpCapabilities::new().http(true))
271            .load_session(true);
272        agent_capabilities.session_capabilities = SessionCapabilities::new()
273            .close(SessionCloseCapabilities::new())
274            .list(SessionListCapabilities::new());
275
276        Ok(InitializeResponse::new(ProtocolVersion::V1)
277            .agent_capabilities(agent_capabilities)
278            .agent_info(
279                Implementation::new("claude-code-cli-acp", env!("CARGO_PKG_VERSION"))
280                    .title("Claude Code CLI ACP"),
281            )
282            .auth_methods(auth_methods(&client_capabilities)))
283    }
284
285    async fn authenticate(
286        &self,
287        request: AuthenticateRequest,
288    ) -> Result<AuthenticateResponse, Error> {
289        match request.method_id.0.as_ref() {
290            CLAUDE_CODE_LOGIN_AUTH_ID | CLAUDE_CODE_TERMINAL_AUTH_ID => {
291                ClaudeCli::from_env_or_path().version().map_err(|err| {
292                    internal_error(anyhow::anyhow!(
293                        "Claude Code CLI is not available for authentication check: {err}"
294                    ))
295                })?;
296                Ok(AuthenticateResponse::new())
297            }
298            other => {
299                Err(Error::invalid_params()
300                    .data(format!("unsupported authentication method: {other}")))
301            }
302        }
303    }
304
305    async fn new_session(
306        &self,
307        request: NewSessionRequest,
308        _cx: ConnectionTo<Client>,
309    ) -> Result<NewSessionResponse, Error> {
310        let session_id = requested_session_id(request.meta.as_ref())
311            .unwrap_or_else(|| SessionId::new(Uuid::new_v4().to_string()));
312        let session = self
313            .manager
314            .create_session(session_id.clone(), request.cwd.clone(), request.mcp_servers)
315            .map_err(internal_error)?;
316        let response = NewSessionResponse::new(session_id.clone())
317            .modes(session.modes())
318            .models(session.models())
319            .config_options(session.config_options());
320        self.sessions
321            .lock()
322            .unwrap()
323            .insert(session_id.clone(), session);
324        Ok(response)
325    }
326
327    async fn load_session(
328        &self,
329        request: LoadSessionRequest,
330        _cx: ConnectionTo<Client>,
331    ) -> Result<LoadSessionResponse, Error> {
332        if let Some(session) = self
333            .sessions
334            .lock()
335            .unwrap()
336            .get(&request.session_id)
337            .cloned()
338        {
339            return Ok(load_session_response(&session));
340        }
341
342        let locator = TranscriptLocator::default_home().map_err(internal_error)?;
343        let has_transcript = locator
344            .find_transcript(request.session_id.0.as_ref())
345            .map_err(internal_error)?
346            .is_some();
347        if !has_transcript {
348            return Err(Error::resource_not_found(None));
349        }
350
351        let session = self
352            .manager
353            .load_session(
354                request.session_id.clone(),
355                request.cwd.clone(),
356                request.mcp_servers,
357            )
358            .map_err(internal_error)?;
359        let response = load_session_response(&session);
360        self.sessions
361            .lock()
362            .unwrap()
363            .insert(request.session_id, session);
364        Ok(response)
365    }
366
367    async fn list_sessions(
368        &self,
369        request: ListSessionsRequest,
370    ) -> Result<ListSessionsResponse, Error> {
371        let cwd_filter = request.cwd.as_ref();
372        let sessions = self
373            .sessions
374            .lock()
375            .unwrap()
376            .values()
377            .filter(|session| cwd_filter.is_none_or(|cwd| cwd == session.cwd()))
378            .map(|session| {
379                SessionInfo::new(session.session_id().clone(), session.cwd().to_path_buf())
380                    .title(Some("Claude Code CLI session".to_string()))
381            })
382            .collect();
383        Ok(ListSessionsResponse::new(sessions))
384    }
385
386    async fn close_session(
387        &self,
388        request: CloseSessionRequest,
389    ) -> Result<CloseSessionResponse, Error> {
390        let session = self.sessions.lock().unwrap().remove(&request.session_id);
391        if let Some(session) = session {
392            session.shutdown().await.map_err(internal_error)?;
393        }
394        Ok(CloseSessionResponse::new())
395    }
396
397    async fn prompt(
398        &self,
399        request: PromptRequest,
400        cx: ConnectionTo<Client>,
401    ) -> Result<PromptResponse, Error> {
402        let session_id = request.session_id.clone();
403        let session = self.get_session(&session_id)?;
404        let prompt = updates::prompt_text(&request);
405        cx.send_notification(SessionNotification::new(
406            session_id.clone(),
407            updates::user_message_chunk(prompt.clone()),
408        ))?;
409        cx.send_notification(SessionNotification::new(
410            session_id.clone(),
411            updates::available_commands(session.cwd()),
412        ))?;
413
414        let permission_cx = cx.clone();
415        let permission_session_id = session_id.clone();
416        let turn = session
417            .prompt_with_permission_handler(
418                prompt,
419                TurnOptions::from_prompt_request(&request),
420                move |permission| {
421                    let permission_cx = permission_cx.clone();
422                    let permission_session_id = permission_session_id.clone();
423                    async move {
424                        let response = permission_cx
425                            .send_request(updates::permission_request(
426                                permission_session_id,
427                                &permission,
428                            ))
429                            .block_task()
430                            .await
431                            .map_err(|err| anyhow::anyhow!("permission request failed: {err}"))?;
432                        updates::permission_decision(&response.outcome).ok_or_else(|| {
433                            anyhow::anyhow!("client returned unknown permission option")
434                        })
435                    }
436                },
437            )
438            .await
439            .map_err(internal_error)?;
440
441        let client_capabilities = self.client_capabilities.lock().unwrap().clone();
442        let mut update_mapper =
443            updates::TranscriptUpdateMapper::from_client(session.cwd(), &client_capabilities);
444        for event in &turn.events {
445            for update in update_mapper.updates_for_event(event) {
446                cx.send_notification(SessionNotification::new(session_id.clone(), update))?;
447            }
448        }
449        if turn.events.is_empty()
450            && let Some(screen_text) = turn.screen_text.as_ref().filter(|text| !text.is_empty())
451        {
452            cx.send_notification(SessionNotification::new(
453                session_id,
454                updates::agent_message_chunk(screen_text.clone()),
455            ))?;
456        }
457
458        Ok(PromptResponse::new(StopReason::EndTurn))
459    }
460
461    async fn cancel(&self, notification: CancelNotification) -> Result<(), Error> {
462        self.get_session(&notification.session_id)?
463            .cancel()
464            .await
465            .map_err(internal_error)
466    }
467
468    async fn set_session_mode(
469        &self,
470        request: SetSessionModeRequest,
471    ) -> Result<SetSessionModeResponse, Error> {
472        info!(
473            "mode change requested for {}: {}",
474            request.session_id, request.mode_id
475        );
476        self.get_session(&request.session_id)?
477            .set_permission_mode(Some(request.mode_id.0.to_string()))
478            .map_err(internal_error)?;
479        Ok(SetSessionModeResponse::default())
480    }
481
482    async fn set_session_model(
483        &self,
484        request: SetSessionModelRequest,
485    ) -> Result<SetSessionModelResponse, Error> {
486        self.get_session(&request.session_id)?
487            .set_model(Some(request.model_id.0.to_string()))
488            .map_err(internal_error)?;
489        Ok(SetSessionModelResponse::default())
490    }
491
492    async fn set_session_config_option(
493        &self,
494        request: SetSessionConfigOptionRequest,
495    ) -> Result<SetSessionConfigOptionResponse, Error> {
496        info!(
497            "config option requested for {}: {}",
498            request.session_id, request.config_id.0
499        );
500        let Some(value) = request.value.as_value_id() else {
501            return Err(internal_error(anyhow::anyhow!(
502                "config option requires a value id"
503            )));
504        };
505        let session = self.get_session(&request.session_id)?;
506        let _update = session
507            .set_config_option(request.config_id.0.as_ref(), value)
508            .map_err(internal_error)?;
509        Ok(SetSessionConfigOptionResponse::new(
510            session.config_options(),
511        ))
512    }
513
514    fn get_session(&self, session_id: &SessionId) -> Result<Arc<ManagedSession>, Error> {
515        self.sessions
516            .lock()
517            .unwrap()
518            .get(session_id)
519            .cloned()
520            .ok_or_else(|| Error::resource_not_found(None))
521    }
522}
523
524fn load_session_response(session: &ManagedSession) -> LoadSessionResponse {
525    LoadSessionResponse::new()
526        .modes(session.modes())
527        .models(session.models())
528        .config_options(session.config_options())
529}
530
531fn auth_methods(client_capabilities: &ClientCapabilities) -> Vec<AuthMethod> {
532    let mut methods = vec![AuthMethod::Agent(
533        AuthMethodAgent::new(CLAUDE_CODE_LOGIN_AUTH_ID, "Use Claude Code login").description(
534            "Uses credentials managed by the installed Claude Code CLI. Run `claude` in a terminal first if authentication is missing.",
535        ),
536    )];
537
538    if client_capabilities.auth.terminal {
539        methods.push(AuthMethod::Terminal(
540            AuthMethodTerminal::new(CLAUDE_CODE_TERMINAL_AUTH_ID, "Open Claude Code login")
541                .description(
542                    "Starts the adapter's interactive pass-through so Claude Code login can be completed in a terminal.",
543                )
544                .args(vec!["interactive".to_string()]),
545        ));
546    }
547
548    methods
549}
550
551fn requested_session_id(
552    meta: Option<&serde_json::Map<String, serde_json::Value>>,
553) -> Option<SessionId> {
554    let meta = meta?;
555    meta.get("claudeCode")
556        .and_then(|value| value.get("sessionId"))
557        .or_else(|| meta.get("sessionId"))
558        .or_else(|| meta.get("session_id"))
559        .and_then(serde_json::Value::as_str)
560        .filter(|value| !value.trim().is_empty())
561        .map(|value| SessionId::new(value.to_string()))
562}
563
564impl Default for AcpServer {
565    fn default() -> Self {
566        Self::new()
567    }
568}
569
570fn internal_error(err: anyhow::Error) -> Error {
571    Error::internal_error().data(err.to_string())
572}