Skip to main content

koda_cli/
acp_adapter.rs

1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9/// Outgoing messages from the ACP adapter — either session notifications or
10/// permission requests (which are JSON-RPC requests the *agent* sends to the *client*).
11#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13    Notification(acp::SessionNotification),
14    PermissionRequest {
15        rpc_id: acp::RequestId,
16        request: acp::RequestPermissionRequest,
17    },
18}
19
20/// Maps a Koda tool name to the ACP `ToolKind` enum.
21pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22    match name {
23        "Read" => acp::ToolKind::Read,
24        "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25        "Bash" | "Shell" => acp::ToolKind::Execute,
26        "Grep" | "Glob" => acp::ToolKind::Search,
27        "Delete" => acp::ToolKind::Delete,
28        "WebFetch" => acp::ToolKind::Fetch,
29        "Think" => acp::ToolKind::Think,
30        _ => acp::ToolKind::Other,
31    }
32}
33
34/// Translates an internal `EngineEvent` to an ACP `SessionNotification`.
35///
36/// Returns `None` for events that have no ACP equivalent (UI-only signals)
37/// or that are handled specially (e.g. `ApprovalRequest`).
38pub fn engine_event_to_acp(
39    event: &EngineEvent,
40    session_id: &str,
41) -> Option<acp::SessionNotification> {
42    match event {
43        EngineEvent::TextDelta { text } => {
44            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45            Some(acp::SessionNotification::new(
46                session_id.to_string(),
47                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48            ))
49        }
50        EngineEvent::TextDone => None,
51        EngineEvent::ThinkingStart => None,
52        EngineEvent::ThinkingDelta { text } => {
53            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54            Some(acp::SessionNotification::new(
55                session_id.to_string(),
56                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57            ))
58        }
59        EngineEvent::ThinkingDone => None,
60        EngineEvent::ResponseStart => None,
61
62        EngineEvent::ToolCallStart { id, name, args, .. } => {
63            let tc = acp::ToolCall::new(id.clone(), name.clone())
64                .kind(map_tool_kind(name))
65                .status(acp::ToolCallStatus::InProgress)
66                .raw_input(Some(args.clone()));
67            Some(acp::SessionNotification::new(
68                session_id.to_string(),
69                acp::SessionUpdate::ToolCall(tc),
70            ))
71        }
72
73        EngineEvent::ToolCallResult {
74            id,
75            name: _,
76            output,
77        } => {
78            let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79                acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80            ))];
81            let fields = acp::ToolCallUpdateFields::new()
82                .status(acp::ToolCallStatus::Completed)
83                .content(content);
84            let update = acp::ToolCallUpdate::new(id.clone(), fields);
85            Some(acp::SessionNotification::new(
86                session_id.to_string(),
87                acp::SessionUpdate::ToolCallUpdate(update),
88            ))
89        }
90
91        EngineEvent::SubAgentStart { agent_name } => {
92            let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93                .kind(acp::ToolKind::Other)
94                .status(acp::ToolCallStatus::InProgress);
95            Some(acp::SessionNotification::new(
96                session_id.to_string(),
97                acp::SessionUpdate::ToolCall(tc),
98            ))
99        }
100
101        EngineEvent::SubAgentEnd { agent_name } => {
102            let fields = acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed);
103            let update = acp::ToolCallUpdate::new(agent_name.clone(), fields);
104            Some(acp::SessionNotification::new(
105                session_id.to_string(),
106                acp::SessionUpdate::ToolCallUpdate(update),
107            ))
108        }
109
110        // Handled specially by AcpSink (bidirectional permission flow)
111        EngineEvent::ApprovalRequest { .. } => None,
112
113        EngineEvent::ActionBlocked {
114            tool_name: _,
115            detail,
116            ..
117        } => {
118            let fields = acp::ToolCallUpdateFields::new()
119                .status(acp::ToolCallStatus::Failed)
120                .title(format!("Blocked: {detail}"));
121            let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
122            Some(acp::SessionNotification::new(
123                session_id.to_string(),
124                acp::SessionUpdate::ToolCallUpdate(update),
125            ))
126        }
127
128        EngineEvent::StatusUpdate { .. } => None,
129        EngineEvent::Footer { .. } => None,
130        EngineEvent::SpinnerStart { .. } => None,
131        EngineEvent::SpinnerStop => None,
132        EngineEvent::TodoDisplay { .. } => None,
133        EngineEvent::TurnStart { .. } => None,
134        EngineEvent::TurnEnd { .. } => None,
135        EngineEvent::LoopCapReached { .. } => None,
136
137        EngineEvent::Info { message } => {
138            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
139            Some(acp::SessionNotification::new(
140                session_id.to_string(),
141                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
142            ))
143        }
144        EngineEvent::Warn { message } => {
145            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
146            Some(acp::SessionNotification::new(
147                session_id.to_string(),
148                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
149            ))
150        }
151        EngineEvent::Error { message } => {
152            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
153            Some(acp::SessionNotification::new(
154                session_id.to_string(),
155                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
156            ))
157        }
158    }
159}
160
161/// Pending approval context: maps an outgoing JSON-RPC request ID back to the
162/// engine approval ID so we can route the client's response correctly.
163pub struct PendingApproval {
164    pub engine_approval_id: String,
165}
166
167/// ACP sink that translates EngineEvents to ACP messages and handles
168/// the bidirectional approval flow.
169pub struct AcpSink {
170    session_id: String,
171    tx: mpsc::Sender<AcpOutgoing>,
172    /// Kept for future bidirectional approval flow where the server reads
173    /// permission responses from stdin and routes them back to the engine.
174    #[allow(dead_code)]
175    cmd_tx: mpsc::Sender<EngineCommand>,
176    pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
177    next_rpc_id: Arc<AtomicI64>,
178}
179
180impl AcpSink {
181    pub fn new(
182        session_id: String,
183        tx: mpsc::Sender<AcpOutgoing>,
184        cmd_tx: mpsc::Sender<EngineCommand>,
185        pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
186        next_rpc_id: Arc<AtomicI64>,
187    ) -> Self {
188        Self {
189            session_id,
190            tx,
191            cmd_tx,
192            pending_approvals,
193            next_rpc_id,
194        }
195    }
196}
197
198impl EngineSink for AcpSink {
199    fn emit(&self, event: EngineEvent) {
200        // Handle approval requests specially — they become outgoing JSON-RPC requests
201        if let EngineEvent::ApprovalRequest {
202            ref id,
203            ref tool_name,
204            ref detail,
205            ..
206        } = event
207        {
208            let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
209            let rpc_id = acp::RequestId::Number(rpc_id_num);
210
211            // Build the permission request
212            let tc_fields = acp::ToolCallUpdateFields::new()
213                .status(acp::ToolCallStatus::Pending)
214                .title(detail.clone());
215            let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
216
217            let options = vec![
218                acp::PermissionOption::new(
219                    "approve",
220                    "Approve",
221                    acp::PermissionOptionKind::AllowOnce,
222                ),
223                acp::PermissionOption::new(
224                    "reject",
225                    "Reject",
226                    acp::PermissionOptionKind::RejectOnce,
227                ),
228                acp::PermissionOption::new(
229                    "always_allow",
230                    "Always Allow",
231                    acp::PermissionOptionKind::AllowAlways,
232                ),
233            ];
234
235            let request =
236                acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
237
238            // Store mapping so we can route the response back
239            self.pending_approvals.lock().unwrap().insert(
240                rpc_id.clone(),
241                PendingApproval {
242                    engine_approval_id: id.clone(),
243                },
244            );
245
246            let _ = self
247                .tx
248                .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
249            return;
250        }
251
252        // Handle loop cap — server always auto-continues
253        if matches!(event, EngineEvent::LoopCapReached { .. }) {
254            let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
255                action: koda_core::loop_guard::LoopContinuation::Continue200,
256            });
257            return;
258        }
259
260        // All other events go through the standard mapping
261        if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
262            let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
263        }
264    }
265}
266
267/// Resolve an ACP permission response to an engine approval command.
268/// Returns the `EngineCommand::ApprovalResponse` if the RPC ID matches a pending approval.
269pub fn resolve_permission_response(
270    pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
271    rpc_id: &acp::RequestId,
272    outcome: &acp::RequestPermissionOutcome,
273    cmd_tx: &mpsc::Sender<EngineCommand>,
274) -> bool {
275    let pending = pending_approvals.lock().unwrap().remove(rpc_id);
276    if let Some(approval) = pending {
277        let decision = match outcome {
278            acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
279            acp::RequestPermissionOutcome::Selected(selected) => {
280                match selected.option_id.0.as_ref() {
281                    "approve" => ApprovalDecision::Approve,
282                    "always_allow" => ApprovalDecision::AlwaysAllow,
283                    _ => ApprovalDecision::Reject,
284                }
285            }
286            _ => ApprovalDecision::Reject,
287        };
288        let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
289            id: approval.engine_approval_id,
290            decision,
291        });
292        true
293    } else {
294        false
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn test_text_delta() {
304        let event = EngineEvent::TextDelta {
305            text: "hello".into(),
306        };
307        let acp = engine_event_to_acp(&event, "session-1").unwrap();
308
309        assert_eq!(acp.session_id, "session-1".to_string().into());
310        match acp.update {
311            acp::SessionUpdate::AgentMessageChunk(chunk) => {
312                let block = chunk.content;
313                match block {
314                    acp::ContentBlock::Text(text_content) => {
315                        assert_eq!(text_content.text, "hello");
316                    }
317                    _ => panic!("Expected text block"),
318                }
319            }
320            _ => panic!("Expected AgentMessageChunk"),
321        }
322    }
323
324    #[test]
325    fn test_thinking_delta() {
326        let event = EngineEvent::ThinkingDelta {
327            text: "reasoning...".into(),
328        };
329        let acp = engine_event_to_acp(&event, "s1").unwrap();
330        match acp.update {
331            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
332                acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
333                _ => panic!("Expected text block"),
334            },
335            _ => panic!("Expected AgentThoughtChunk"),
336        }
337    }
338
339    #[test]
340    fn test_tool_call_start() {
341        let event = EngineEvent::ToolCallStart {
342            id: "call_1".into(),
343            name: "Bash".into(),
344            args: serde_json::json!({"command": "ls"}),
345            is_sub_agent: false,
346        };
347        let acp = engine_event_to_acp(&event, "s1").unwrap();
348        match acp.update {
349            acp::SessionUpdate::ToolCall(tc) => {
350                assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
351                assert_eq!(tc.title, "Bash");
352                assert_eq!(tc.kind, acp::ToolKind::Execute);
353                assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
354            }
355            _ => panic!("Expected ToolCall"),
356        }
357    }
358
359    #[test]
360    fn test_tool_call_result() {
361        let event = EngineEvent::ToolCallResult {
362            id: "call_1".into(),
363            name: "Read".into(),
364            output: "file contents".into(),
365        };
366        let acp = engine_event_to_acp(&event, "s1").unwrap();
367        match acp.update {
368            acp::SessionUpdate::ToolCallUpdate(update) => {
369                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
370                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
371            }
372            _ => panic!("Expected ToolCallUpdate"),
373        }
374    }
375
376    #[test]
377    fn test_sub_agent_start() {
378        let event = EngineEvent::SubAgentStart {
379            agent_name: "reviewer".into(),
380        };
381        let acp = engine_event_to_acp(&event, "s1").unwrap();
382        match acp.update {
383            acp::SessionUpdate::ToolCall(tc) => {
384                assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
385                assert_eq!(tc.kind, acp::ToolKind::Other);
386            }
387            _ => panic!("Expected ToolCall"),
388        }
389    }
390
391    #[test]
392    fn test_sub_agent_end() {
393        let event = EngineEvent::SubAgentEnd {
394            agent_name: "reviewer".into(),
395        };
396        let acp = engine_event_to_acp(&event, "s1").unwrap();
397        match acp.update {
398            acp::SessionUpdate::ToolCallUpdate(update) => {
399                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
400            }
401            _ => panic!("Expected ToolCallUpdate"),
402        }
403    }
404
405    #[test]
406    fn test_action_blocked() {
407        let event = EngineEvent::ActionBlocked {
408            tool_name: "Bash".into(),
409            detail: "rm -rf /".into(),
410            preview: None,
411        };
412        let acp = engine_event_to_acp(&event, "s1").unwrap();
413        match acp.update {
414            acp::SessionUpdate::ToolCallUpdate(update) => {
415                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
416                assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
417            }
418            _ => panic!("Expected ToolCallUpdate"),
419        }
420    }
421
422    #[test]
423    fn test_info_warn_error() {
424        for (event, prefix) in [
425            (
426                EngineEvent::Info {
427                    message: "hello".into(),
428                },
429                "[info]",
430            ),
431            (
432                EngineEvent::Warn {
433                    message: "watch out".into(),
434                },
435                "[warn]",
436            ),
437            (
438                EngineEvent::Error {
439                    message: "oops".into(),
440                },
441                "[error]",
442            ),
443        ] {
444            let acp = engine_event_to_acp(&event, "s1").unwrap();
445            match acp.update {
446                acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
447                    acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
448                    _ => panic!("Expected text block"),
449                },
450                _ => panic!("Expected AgentMessageChunk"),
451            }
452        }
453    }
454
455    #[test]
456    fn test_none_events() {
457        let none_events = vec![
458            EngineEvent::TextDone,
459            EngineEvent::ThinkingStart,
460            EngineEvent::ThinkingDone,
461            EngineEvent::ResponseStart,
462            EngineEvent::ApprovalRequest {
463                id: "a".into(),
464                tool_name: "Bash".into(),
465                detail: "cmd".into(),
466                preview: None,
467                whitelist_hint: None,
468            },
469            EngineEvent::StatusUpdate {
470                model: "m".into(),
471                provider: "p".into(),
472                context_pct: 0.5,
473                approval_mode: "normal".into(),
474                active_tools: 0,
475            },
476            EngineEvent::Footer {
477                prompt_tokens: 0,
478                completion_tokens: 0,
479                cache_read_tokens: 0,
480                thinking_tokens: 0,
481                total_chars: 0,
482                elapsed_ms: 0,
483                rate: 0.0,
484                context: String::new(),
485            },
486            EngineEvent::SpinnerStart {
487                message: "x".into(),
488            },
489            EngineEvent::SpinnerStop,
490            EngineEvent::TurnStart {
491                turn_id: "t1".into(),
492            },
493            EngineEvent::TurnEnd {
494                turn_id: "t1".into(),
495                reason: koda_core::engine::event::TurnEndReason::Complete,
496            },
497            EngineEvent::LoopCapReached {
498                cap: 200,
499                recent_tools: vec![],
500            },
501        ];
502        for event in none_events {
503            assert!(
504                engine_event_to_acp(&event, "s1").is_none(),
505                "Expected None for {event:?}"
506            );
507        }
508    }
509
510    #[test]
511    fn test_map_tool_kind() {
512        assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
513        assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
514        assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
515        assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
516        assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
517        assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
518        assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
519        assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
520        assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
521        assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
522    }
523}