Skip to main content

koda_cli/
acp_adapter.rs

1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9/// Outgoing messages from the ACP adapter — either session notifications or
10/// permission requests (which are JSON-RPC requests the *agent* sends to the *client*).
11#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13    Notification(acp::SessionNotification),
14    PermissionRequest {
15        rpc_id: acp::RequestId,
16        request: acp::RequestPermissionRequest,
17    },
18}
19
20/// Maps a Koda tool name to the ACP `ToolKind` enum.
21pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22    match name {
23        "Read" => acp::ToolKind::Read,
24        "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25        "Bash" | "Shell" => acp::ToolKind::Execute,
26        "Grep" | "Glob" => acp::ToolKind::Search,
27        "Delete" => acp::ToolKind::Delete,
28        "WebFetch" => acp::ToolKind::Fetch,
29        "Think" => acp::ToolKind::Think,
30        _ => acp::ToolKind::Other,
31    }
32}
33
34/// Translates an internal `EngineEvent` to an ACP `SessionNotification`.
35///
36/// Returns `None` for events that have no ACP equivalent (UI-only signals)
37/// or that are handled specially (e.g. `ApprovalRequest`).
38pub fn engine_event_to_acp(
39    event: &EngineEvent,
40    session_id: &str,
41) -> Option<acp::SessionNotification> {
42    match event {
43        EngineEvent::TextDelta { text } => {
44            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45            Some(acp::SessionNotification::new(
46                session_id.to_string(),
47                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48            ))
49        }
50        EngineEvent::TextDone => None,
51        EngineEvent::ThinkingStart => None,
52        EngineEvent::ThinkingDelta { text } => {
53            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54            Some(acp::SessionNotification::new(
55                session_id.to_string(),
56                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57            ))
58        }
59        EngineEvent::ThinkingDone => None,
60        EngineEvent::ResponseStart => None,
61
62        EngineEvent::ToolCallStart { id, name, args, .. } => {
63            let tc = acp::ToolCall::new(id.clone(), name.clone())
64                .kind(map_tool_kind(name))
65                .status(acp::ToolCallStatus::InProgress)
66                .raw_input(Some(args.clone()));
67            Some(acp::SessionNotification::new(
68                session_id.to_string(),
69                acp::SessionUpdate::ToolCall(tc),
70            ))
71        }
72
73        EngineEvent::ToolCallResult {
74            id,
75            name: _,
76            output,
77        } => {
78            let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79                acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80            ))];
81            let fields = acp::ToolCallUpdateFields::new()
82                .status(acp::ToolCallStatus::Completed)
83                .content(content);
84            let update = acp::ToolCallUpdate::new(id.clone(), fields);
85            Some(acp::SessionNotification::new(
86                session_id.to_string(),
87                acp::SessionUpdate::ToolCallUpdate(update),
88            ))
89        }
90
91        EngineEvent::SubAgentStart { agent_name } => {
92            let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93                .kind(acp::ToolKind::Other)
94                .status(acp::ToolCallStatus::InProgress);
95            Some(acp::SessionNotification::new(
96                session_id.to_string(),
97                acp::SessionUpdate::ToolCall(tc),
98            ))
99        }
100
101        // Handled specially by AcpSink (bidirectional permission flow)
102        EngineEvent::ApprovalRequest { .. } => None,
103
104        EngineEvent::ActionBlocked {
105            tool_name: _,
106            detail,
107            ..
108        } => {
109            let fields = acp::ToolCallUpdateFields::new()
110                .status(acp::ToolCallStatus::Failed)
111                .title(format!("Blocked: {detail}"));
112            let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
113            Some(acp::SessionNotification::new(
114                session_id.to_string(),
115                acp::SessionUpdate::ToolCallUpdate(update),
116            ))
117        }
118
119        EngineEvent::StatusUpdate { .. } => None,
120        EngineEvent::Footer { .. } => None,
121        EngineEvent::SpinnerStart { .. } => None,
122        EngineEvent::SpinnerStop => None,
123        EngineEvent::TurnStart { .. } => None,
124        EngineEvent::TurnEnd { .. } => None,
125        EngineEvent::LoopCapReached { .. } => None,
126
127        EngineEvent::Info { message } => {
128            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
129            Some(acp::SessionNotification::new(
130                session_id.to_string(),
131                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
132            ))
133        }
134        EngineEvent::Warn { message } => {
135            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
136            Some(acp::SessionNotification::new(
137                session_id.to_string(),
138                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
139            ))
140        }
141        EngineEvent::Error { message } => {
142            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
143            Some(acp::SessionNotification::new(
144                session_id.to_string(),
145                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
146            ))
147        }
148    }
149}
150
151/// Pending approval context: maps an outgoing JSON-RPC request ID back to the
152/// engine approval ID so we can route the client's response correctly.
153pub struct PendingApproval {
154    pub engine_approval_id: String,
155}
156
157/// ACP sink that translates EngineEvents to ACP messages and handles
158/// the bidirectional approval flow.
159pub struct AcpSink {
160    session_id: String,
161    tx: mpsc::Sender<AcpOutgoing>,
162    /// Kept for future bidirectional approval flow where the server reads
163    /// permission responses from stdin and routes them back to the engine.
164    #[allow(dead_code)]
165    cmd_tx: mpsc::Sender<EngineCommand>,
166    pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
167    next_rpc_id: Arc<AtomicI64>,
168}
169
170impl AcpSink {
171    pub fn new(
172        session_id: String,
173        tx: mpsc::Sender<AcpOutgoing>,
174        cmd_tx: mpsc::Sender<EngineCommand>,
175        pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
176        next_rpc_id: Arc<AtomicI64>,
177    ) -> Self {
178        Self {
179            session_id,
180            tx,
181            cmd_tx,
182            pending_approvals,
183            next_rpc_id,
184        }
185    }
186}
187
188impl EngineSink for AcpSink {
189    fn emit(&self, event: EngineEvent) {
190        // Handle approval requests specially — they become outgoing JSON-RPC requests
191        if let EngineEvent::ApprovalRequest {
192            ref id,
193            ref tool_name,
194            ref detail,
195            ..
196        } = event
197        {
198            let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
199            let rpc_id = acp::RequestId::Number(rpc_id_num);
200
201            // Build the permission request
202            let tc_fields = acp::ToolCallUpdateFields::new()
203                .status(acp::ToolCallStatus::Pending)
204                .title(detail.clone());
205            let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
206
207            let options = vec![
208                acp::PermissionOption::new(
209                    "approve",
210                    "Approve",
211                    acp::PermissionOptionKind::AllowOnce,
212                ),
213                acp::PermissionOption::new(
214                    "reject",
215                    "Reject",
216                    acp::PermissionOptionKind::RejectOnce,
217                ),
218                acp::PermissionOption::new(
219                    "always_allow",
220                    "Always Allow",
221                    acp::PermissionOptionKind::AllowAlways,
222                ),
223            ];
224
225            let request =
226                acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
227
228            // Store mapping so we can route the response back
229            self.pending_approvals.lock().unwrap().insert(
230                rpc_id.clone(),
231                PendingApproval {
232                    engine_approval_id: id.clone(),
233                },
234            );
235
236            let _ = self
237                .tx
238                .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
239            return;
240        }
241
242        // Handle loop cap — server always auto-continues
243        if matches!(event, EngineEvent::LoopCapReached { .. }) {
244            let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
245                action: koda_core::loop_guard::LoopContinuation::Continue200,
246            });
247            return;
248        }
249
250        // All other events go through the standard mapping
251        if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
252            let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
253        }
254    }
255}
256
257/// Resolve an ACP permission response to an engine approval command.
258/// Returns the `EngineCommand::ApprovalResponse` if the RPC ID matches a pending approval.
259pub fn resolve_permission_response(
260    pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
261    rpc_id: &acp::RequestId,
262    outcome: &acp::RequestPermissionOutcome,
263    cmd_tx: &mpsc::Sender<EngineCommand>,
264) -> bool {
265    let pending = pending_approvals.lock().unwrap().remove(rpc_id);
266    if let Some(approval) = pending {
267        let decision = match outcome {
268            acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
269            acp::RequestPermissionOutcome::Selected(selected) => {
270                match selected.option_id.0.as_ref() {
271                    "approve" => ApprovalDecision::Approve,
272                    _ => ApprovalDecision::Reject,
273                }
274            }
275            _ => ApprovalDecision::Reject,
276        };
277        let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
278            id: approval.engine_approval_id,
279            decision,
280        });
281        true
282    } else {
283        false
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_text_delta() {
293        let event = EngineEvent::TextDelta {
294            text: "hello".into(),
295        };
296        let acp = engine_event_to_acp(&event, "session-1").unwrap();
297
298        assert_eq!(acp.session_id, "session-1".to_string().into());
299        match acp.update {
300            acp::SessionUpdate::AgentMessageChunk(chunk) => {
301                let block = chunk.content;
302                match block {
303                    acp::ContentBlock::Text(text_content) => {
304                        assert_eq!(text_content.text, "hello");
305                    }
306                    _ => panic!("Expected text block"),
307                }
308            }
309            _ => panic!("Expected AgentMessageChunk"),
310        }
311    }
312
313    #[test]
314    fn test_thinking_delta() {
315        let event = EngineEvent::ThinkingDelta {
316            text: "reasoning...".into(),
317        };
318        let acp = engine_event_to_acp(&event, "s1").unwrap();
319        match acp.update {
320            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
321                acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
322                _ => panic!("Expected text block"),
323            },
324            _ => panic!("Expected AgentThoughtChunk"),
325        }
326    }
327
328    #[test]
329    fn test_tool_call_start() {
330        let event = EngineEvent::ToolCallStart {
331            id: "call_1".into(),
332            name: "Bash".into(),
333            args: serde_json::json!({"command": "ls"}),
334            is_sub_agent: false,
335        };
336        let acp = engine_event_to_acp(&event, "s1").unwrap();
337        match acp.update {
338            acp::SessionUpdate::ToolCall(tc) => {
339                assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
340                assert_eq!(tc.title, "Bash");
341                assert_eq!(tc.kind, acp::ToolKind::Execute);
342                assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
343            }
344            _ => panic!("Expected ToolCall"),
345        }
346    }
347
348    #[test]
349    fn test_tool_call_result() {
350        let event = EngineEvent::ToolCallResult {
351            id: "call_1".into(),
352            name: "Read".into(),
353            output: "file contents".into(),
354        };
355        let acp = engine_event_to_acp(&event, "s1").unwrap();
356        match acp.update {
357            acp::SessionUpdate::ToolCallUpdate(update) => {
358                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
359                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
360            }
361            _ => panic!("Expected ToolCallUpdate"),
362        }
363    }
364
365    #[test]
366    fn test_sub_agent_start() {
367        let event = EngineEvent::SubAgentStart {
368            agent_name: "reviewer".into(),
369        };
370        let acp = engine_event_to_acp(&event, "s1").unwrap();
371        match acp.update {
372            acp::SessionUpdate::ToolCall(tc) => {
373                assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
374                assert_eq!(tc.kind, acp::ToolKind::Other);
375            }
376            _ => panic!("Expected ToolCall"),
377        }
378    }
379
380    #[test]
381    fn test_action_blocked() {
382        let event = EngineEvent::ActionBlocked {
383            tool_name: "Bash".into(),
384            detail: "rm -rf /".into(),
385            preview: None,
386        };
387        let acp = engine_event_to_acp(&event, "s1").unwrap();
388        match acp.update {
389            acp::SessionUpdate::ToolCallUpdate(update) => {
390                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
391                assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
392            }
393            _ => panic!("Expected ToolCallUpdate"),
394        }
395    }
396
397    #[test]
398    fn test_info_warn_error() {
399        for (event, prefix) in [
400            (
401                EngineEvent::Info {
402                    message: "hello".into(),
403                },
404                "[info]",
405            ),
406            (
407                EngineEvent::Warn {
408                    message: "watch out".into(),
409                },
410                "[warn]",
411            ),
412            (
413                EngineEvent::Error {
414                    message: "oops".into(),
415                },
416                "[error]",
417            ),
418        ] {
419            let acp = engine_event_to_acp(&event, "s1").unwrap();
420            match acp.update {
421                acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
422                    acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
423                    _ => panic!("Expected text block"),
424                },
425                _ => panic!("Expected AgentMessageChunk"),
426            }
427        }
428    }
429
430    #[test]
431    fn test_none_events() {
432        let none_events = vec![
433            EngineEvent::TextDone,
434            EngineEvent::ThinkingStart,
435            EngineEvent::ThinkingDone,
436            EngineEvent::ResponseStart,
437            EngineEvent::ApprovalRequest {
438                id: "a".into(),
439                tool_name: "Bash".into(),
440                detail: "cmd".into(),
441                preview: None,
442            },
443            EngineEvent::StatusUpdate {
444                model: "m".into(),
445                provider: "p".into(),
446                context_pct: 0.5,
447                approval_mode: "normal".into(),
448                active_tools: 0,
449            },
450            EngineEvent::Footer {
451                prompt_tokens: 0,
452                completion_tokens: 0,
453                cache_read_tokens: 0,
454                thinking_tokens: 0,
455                total_chars: 0,
456                elapsed_ms: 0,
457                rate: 0.0,
458                context: String::new(),
459            },
460            EngineEvent::SpinnerStart {
461                message: "x".into(),
462            },
463            EngineEvent::SpinnerStop,
464            EngineEvent::TurnStart {
465                turn_id: "t1".into(),
466            },
467            EngineEvent::TurnEnd {
468                turn_id: "t1".into(),
469                reason: koda_core::engine::event::TurnEndReason::Complete,
470            },
471            EngineEvent::LoopCapReached {
472                cap: 200,
473                recent_tools: vec![],
474            },
475        ];
476        for event in none_events {
477            assert!(
478                engine_event_to_acp(&event, "s1").is_none(),
479                "Expected None for {event:?}"
480            );
481        }
482    }
483
484    #[test]
485    fn test_map_tool_kind() {
486        assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
487        assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
488        assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
489        assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
490        assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
491        assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
492        assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
493        assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
494        assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
495        assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
496    }
497}