Skip to main content

koda_cli/
acp_adapter.rs

1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9/// Outgoing messages from the ACP adapter — either session notifications or
10/// permission requests (which are JSON-RPC requests the *agent* sends to the *client*).
11#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13    Notification(acp::SessionNotification),
14    PermissionRequest {
15        rpc_id: acp::RequestId,
16        request: acp::RequestPermissionRequest,
17    },
18}
19
20/// Maps a Koda tool name to the ACP `ToolKind` enum.
21pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22    match name {
23        "Read" => acp::ToolKind::Read,
24        "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25        "Bash" | "Shell" => acp::ToolKind::Execute,
26        "Grep" | "Glob" => acp::ToolKind::Search,
27        "Delete" => acp::ToolKind::Delete,
28        "WebFetch" => acp::ToolKind::Fetch,
29        "Think" => acp::ToolKind::Think,
30        _ => acp::ToolKind::Other,
31    }
32}
33
34/// Translates an internal `EngineEvent` to an ACP `SessionNotification`.
35///
36/// Returns `None` for events that have no ACP equivalent (UI-only signals)
37/// or that are handled specially (e.g. `ApprovalRequest`).
38pub fn engine_event_to_acp(
39    event: &EngineEvent,
40    session_id: &str,
41) -> Option<acp::SessionNotification> {
42    match event {
43        EngineEvent::TextDelta { text } => {
44            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45            Some(acp::SessionNotification::new(
46                session_id.to_string(),
47                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48            ))
49        }
50        EngineEvent::TextDone => None,
51        EngineEvent::ThinkingStart => None,
52        EngineEvent::ThinkingDelta { text } => {
53            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54            Some(acp::SessionNotification::new(
55                session_id.to_string(),
56                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57            ))
58        }
59        EngineEvent::ThinkingDone => None,
60        EngineEvent::ResponseStart => None,
61
62        EngineEvent::ToolCallStart { id, name, args, .. } => {
63            let tc = acp::ToolCall::new(id.clone(), name.clone())
64                .kind(map_tool_kind(name))
65                .status(acp::ToolCallStatus::InProgress)
66                .raw_input(Some(args.clone()));
67            Some(acp::SessionNotification::new(
68                session_id.to_string(),
69                acp::SessionUpdate::ToolCall(tc),
70            ))
71        }
72
73        EngineEvent::ToolCallResult {
74            id,
75            name: _,
76            output,
77        } => {
78            let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79                acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80            ))];
81            let fields = acp::ToolCallUpdateFields::new()
82                .status(acp::ToolCallStatus::Completed)
83                .content(content);
84            let update = acp::ToolCallUpdate::new(id.clone(), fields);
85            Some(acp::SessionNotification::new(
86                session_id.to_string(),
87                acp::SessionUpdate::ToolCallUpdate(update),
88            ))
89        }
90
91        EngineEvent::SubAgentStart { agent_name } => {
92            let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93                .kind(acp::ToolKind::Other)
94                .status(acp::ToolCallStatus::InProgress);
95            Some(acp::SessionNotification::new(
96                session_id.to_string(),
97                acp::SessionUpdate::ToolCall(tc),
98            ))
99        }
100
101        EngineEvent::SubAgentEnd { agent_name } => {
102            let fields = acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed);
103            let update = acp::ToolCallUpdate::new(agent_name.clone(), fields);
104            Some(acp::SessionNotification::new(
105                session_id.to_string(),
106                acp::SessionUpdate::ToolCallUpdate(update),
107            ))
108        }
109
110        // Handled specially by AcpSink (bidirectional permission flow)
111        EngineEvent::ApprovalRequest { .. } => None,
112
113        EngineEvent::ActionBlocked {
114            tool_name: _,
115            detail,
116            ..
117        } => {
118            let fields = acp::ToolCallUpdateFields::new()
119                .status(acp::ToolCallStatus::Failed)
120                .title(format!("Blocked: {detail}"));
121            let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
122            Some(acp::SessionNotification::new(
123                session_id.to_string(),
124                acp::SessionUpdate::ToolCallUpdate(update),
125            ))
126        }
127
128        EngineEvent::StatusUpdate { .. } => None,
129        EngineEvent::Footer { .. } => None,
130        EngineEvent::SpinnerStart { .. } => None,
131        EngineEvent::SpinnerStop => None,
132
133        EngineEvent::Info { message } => {
134            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
135            Some(acp::SessionNotification::new(
136                session_id.to_string(),
137                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
138            ))
139        }
140        EngineEvent::Warn { message } => {
141            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
142            Some(acp::SessionNotification::new(
143                session_id.to_string(),
144                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
145            ))
146        }
147        EngineEvent::Error { message } => {
148            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
149            Some(acp::SessionNotification::new(
150                session_id.to_string(),
151                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
152            ))
153        }
154    }
155}
156
157/// Pending approval context: maps an outgoing JSON-RPC request ID back to the
158/// engine approval ID so we can route the client's response correctly.
159pub struct PendingApproval {
160    pub engine_approval_id: String,
161}
162
163/// ACP sink that translates EngineEvents to ACP messages and handles
164/// the bidirectional approval flow.
165pub struct AcpSink {
166    session_id: String,
167    tx: mpsc::Sender<AcpOutgoing>,
168    /// Kept for future bidirectional approval flow where the server reads
169    /// permission responses from stdin and routes them back to the engine.
170    #[allow(dead_code)]
171    cmd_tx: mpsc::Sender<EngineCommand>,
172    pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
173    next_rpc_id: Arc<AtomicI64>,
174}
175
176impl AcpSink {
177    pub fn new(
178        session_id: String,
179        tx: mpsc::Sender<AcpOutgoing>,
180        cmd_tx: mpsc::Sender<EngineCommand>,
181        pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
182        next_rpc_id: Arc<AtomicI64>,
183    ) -> Self {
184        Self {
185            session_id,
186            tx,
187            cmd_tx,
188            pending_approvals,
189            next_rpc_id,
190        }
191    }
192}
193
194impl EngineSink for AcpSink {
195    fn emit(&self, event: EngineEvent) {
196        // Handle approval requests specially — they become outgoing JSON-RPC requests
197        if let EngineEvent::ApprovalRequest {
198            ref id,
199            ref tool_name,
200            ref detail,
201            ..
202        } = event
203        {
204            let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
205            let rpc_id = acp::RequestId::Number(rpc_id_num);
206
207            // Build the permission request
208            let tc_fields = acp::ToolCallUpdateFields::new()
209                .status(acp::ToolCallStatus::Pending)
210                .title(detail.clone());
211            let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
212
213            let options = vec![
214                acp::PermissionOption::new(
215                    "approve",
216                    "Approve",
217                    acp::PermissionOptionKind::AllowOnce,
218                ),
219                acp::PermissionOption::new(
220                    "reject",
221                    "Reject",
222                    acp::PermissionOptionKind::RejectOnce,
223                ),
224                acp::PermissionOption::new(
225                    "always_allow",
226                    "Always Allow",
227                    acp::PermissionOptionKind::AllowAlways,
228                ),
229            ];
230
231            let request =
232                acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
233
234            // Store mapping so we can route the response back
235            self.pending_approvals.lock().unwrap().insert(
236                rpc_id.clone(),
237                PendingApproval {
238                    engine_approval_id: id.clone(),
239                },
240            );
241
242            let _ = self
243                .tx
244                .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
245            return;
246        }
247
248        // All other events go through the standard mapping
249        if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
250            let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
251        }
252    }
253}
254
255/// Resolve an ACP permission response to an engine approval command.
256/// Returns the `EngineCommand::ApprovalResponse` if the RPC ID matches a pending approval.
257pub fn resolve_permission_response(
258    pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
259    rpc_id: &acp::RequestId,
260    outcome: &acp::RequestPermissionOutcome,
261    cmd_tx: &mpsc::Sender<EngineCommand>,
262) -> bool {
263    let pending = pending_approvals.lock().unwrap().remove(rpc_id);
264    if let Some(approval) = pending {
265        let decision = match outcome {
266            acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
267            acp::RequestPermissionOutcome::Selected(selected) => {
268                match selected.option_id.0.as_ref() {
269                    "approve" => ApprovalDecision::Approve,
270                    "always_allow" => ApprovalDecision::AlwaysAllow,
271                    _ => ApprovalDecision::Reject,
272                }
273            }
274            _ => ApprovalDecision::Reject,
275        };
276        let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
277            id: approval.engine_approval_id,
278            decision,
279        });
280        true
281    } else {
282        false
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn test_text_delta() {
292        let event = EngineEvent::TextDelta {
293            text: "hello".into(),
294        };
295        let acp = engine_event_to_acp(&event, "session-1").unwrap();
296
297        assert_eq!(acp.session_id, "session-1".to_string().into());
298        match acp.update {
299            acp::SessionUpdate::AgentMessageChunk(chunk) => {
300                let block = chunk.content;
301                match block {
302                    acp::ContentBlock::Text(text_content) => {
303                        assert_eq!(text_content.text, "hello");
304                    }
305                    _ => panic!("Expected text block"),
306                }
307            }
308            _ => panic!("Expected AgentMessageChunk"),
309        }
310    }
311
312    #[test]
313    fn test_thinking_delta() {
314        let event = EngineEvent::ThinkingDelta {
315            text: "reasoning...".into(),
316        };
317        let acp = engine_event_to_acp(&event, "s1").unwrap();
318        match acp.update {
319            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
320                acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
321                _ => panic!("Expected text block"),
322            },
323            _ => panic!("Expected AgentThoughtChunk"),
324        }
325    }
326
327    #[test]
328    fn test_tool_call_start() {
329        let event = EngineEvent::ToolCallStart {
330            id: "call_1".into(),
331            name: "Bash".into(),
332            args: serde_json::json!({"command": "ls"}),
333            is_sub_agent: false,
334        };
335        let acp = engine_event_to_acp(&event, "s1").unwrap();
336        match acp.update {
337            acp::SessionUpdate::ToolCall(tc) => {
338                assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
339                assert_eq!(tc.title, "Bash");
340                assert_eq!(tc.kind, acp::ToolKind::Execute);
341                assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
342            }
343            _ => panic!("Expected ToolCall"),
344        }
345    }
346
347    #[test]
348    fn test_tool_call_result() {
349        let event = EngineEvent::ToolCallResult {
350            id: "call_1".into(),
351            name: "Read".into(),
352            output: "file contents".into(),
353        };
354        let acp = engine_event_to_acp(&event, "s1").unwrap();
355        match acp.update {
356            acp::SessionUpdate::ToolCallUpdate(update) => {
357                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
358                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
359            }
360            _ => panic!("Expected ToolCallUpdate"),
361        }
362    }
363
364    #[test]
365    fn test_sub_agent_start() {
366        let event = EngineEvent::SubAgentStart {
367            agent_name: "reviewer".into(),
368        };
369        let acp = engine_event_to_acp(&event, "s1").unwrap();
370        match acp.update {
371            acp::SessionUpdate::ToolCall(tc) => {
372                assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
373                assert_eq!(tc.kind, acp::ToolKind::Other);
374            }
375            _ => panic!("Expected ToolCall"),
376        }
377    }
378
379    #[test]
380    fn test_sub_agent_end() {
381        let event = EngineEvent::SubAgentEnd {
382            agent_name: "reviewer".into(),
383        };
384        let acp = engine_event_to_acp(&event, "s1").unwrap();
385        match acp.update {
386            acp::SessionUpdate::ToolCallUpdate(update) => {
387                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
388            }
389            _ => panic!("Expected ToolCallUpdate"),
390        }
391    }
392
393    #[test]
394    fn test_action_blocked() {
395        let event = EngineEvent::ActionBlocked {
396            tool_name: "Bash".into(),
397            detail: "rm -rf /".into(),
398            preview: None,
399        };
400        let acp = engine_event_to_acp(&event, "s1").unwrap();
401        match acp.update {
402            acp::SessionUpdate::ToolCallUpdate(update) => {
403                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
404                assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
405            }
406            _ => panic!("Expected ToolCallUpdate"),
407        }
408    }
409
410    #[test]
411    fn test_info_warn_error() {
412        for (event, prefix) in [
413            (
414                EngineEvent::Info {
415                    message: "hello".into(),
416                },
417                "[info]",
418            ),
419            (
420                EngineEvent::Warn {
421                    message: "watch out".into(),
422                },
423                "[warn]",
424            ),
425            (
426                EngineEvent::Error {
427                    message: "oops".into(),
428                },
429                "[error]",
430            ),
431        ] {
432            let acp = engine_event_to_acp(&event, "s1").unwrap();
433            match acp.update {
434                acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
435                    acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
436                    _ => panic!("Expected text block"),
437                },
438                _ => panic!("Expected AgentMessageChunk"),
439            }
440        }
441    }
442
443    #[test]
444    fn test_none_events() {
445        let none_events = vec![
446            EngineEvent::TextDone,
447            EngineEvent::ThinkingStart,
448            EngineEvent::ThinkingDone,
449            EngineEvent::ResponseStart,
450            EngineEvent::ApprovalRequest {
451                id: "a".into(),
452                tool_name: "Bash".into(),
453                detail: "cmd".into(),
454                preview: None,
455                whitelist_hint: None,
456            },
457            EngineEvent::StatusUpdate {
458                model: "m".into(),
459                provider: "p".into(),
460                context_pct: 0.5,
461                approval_mode: "normal".into(),
462                active_tools: 0,
463            },
464            EngineEvent::Footer {
465                prompt_tokens: 0,
466                completion_tokens: 0,
467                cache_read_tokens: 0,
468                thinking_tokens: 0,
469                total_chars: 0,
470                elapsed_ms: 0,
471                rate: 0.0,
472                context: String::new(),
473            },
474            EngineEvent::SpinnerStart {
475                message: "x".into(),
476            },
477            EngineEvent::SpinnerStop,
478        ];
479        for event in none_events {
480            assert!(
481                engine_event_to_acp(&event, "s1").is_none(),
482                "Expected None for {event:?}"
483            );
484        }
485    }
486
487    #[test]
488    fn test_map_tool_kind() {
489        assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
490        assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
491        assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
492        assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
493        assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
494        assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
495        assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
496        assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
497        assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
498        assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
499    }
500}