Skip to main content

koda_cli/
acp_adapter.rs

1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9/// Outgoing messages from the ACP adapter — either session notifications or
10/// permission requests (which are JSON-RPC requests the *agent* sends to the *client*).
11#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13    Notification(acp::SessionNotification),
14    PermissionRequest {
15        rpc_id: acp::RequestId,
16        request: acp::RequestPermissionRequest,
17    },
18}
19
20/// Maps a Koda tool name to the ACP `ToolKind` enum.
21pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22    match name {
23        "Read" => acp::ToolKind::Read,
24        "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25        "Bash" | "Shell" => acp::ToolKind::Execute,
26        "Grep" | "Glob" => acp::ToolKind::Search,
27        "Delete" => acp::ToolKind::Delete,
28        "WebFetch" => acp::ToolKind::Fetch,
29        "Think" => acp::ToolKind::Think,
30        _ => acp::ToolKind::Other,
31    }
32}
33
34/// Translates an internal `EngineEvent` to an ACP `SessionNotification`.
35///
36/// Returns `None` for events that have no ACP equivalent (UI-only signals)
37/// or that are handled specially (e.g. `ApprovalRequest`).
38pub fn engine_event_to_acp(
39    event: &EngineEvent,
40    session_id: &str,
41) -> Option<acp::SessionNotification> {
42    match event {
43        EngineEvent::TextDelta { text } => {
44            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45            Some(acp::SessionNotification::new(
46                session_id.to_string(),
47                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48            ))
49        }
50        EngineEvent::TextDone => None,
51        EngineEvent::ThinkingStart => None,
52        EngineEvent::ThinkingDelta { text } => {
53            let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54            Some(acp::SessionNotification::new(
55                session_id.to_string(),
56                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57            ))
58        }
59        EngineEvent::ThinkingDone => None,
60        EngineEvent::ResponseStart => None,
61
62        EngineEvent::ToolCallStart { id, name, args, .. } => {
63            let tc = acp::ToolCall::new(id.clone(), name.clone())
64                .kind(map_tool_kind(name))
65                .status(acp::ToolCallStatus::InProgress)
66                .raw_input(Some(args.clone()));
67            Some(acp::SessionNotification::new(
68                session_id.to_string(),
69                acp::SessionUpdate::ToolCall(tc),
70            ))
71        }
72
73        // Streaming output lines — not mapped to ACP events (yet).
74        EngineEvent::ToolOutputLine { .. } => None,
75
76        EngineEvent::ToolCallResult {
77            id,
78            name: _,
79            output,
80        } => {
81            let content = vec![acp::ToolCallContent::Content(acp::Content::new(
82                acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
83            ))];
84            let fields = acp::ToolCallUpdateFields::new()
85                .status(acp::ToolCallStatus::Completed)
86                .content(content);
87            let update = acp::ToolCallUpdate::new(id.clone(), fields);
88            Some(acp::SessionNotification::new(
89                session_id.to_string(),
90                acp::SessionUpdate::ToolCallUpdate(update),
91            ))
92        }
93
94        EngineEvent::SubAgentStart { agent_name } => {
95            let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
96                .kind(acp::ToolKind::Other)
97                .status(acp::ToolCallStatus::InProgress);
98            Some(acp::SessionNotification::new(
99                session_id.to_string(),
100                acp::SessionUpdate::ToolCall(tc),
101            ))
102        }
103
104        // Handled specially by AcpSink (bidirectional permission flow)
105        EngineEvent::ApprovalRequest { .. } => None,
106        // AskUser not yet implemented in ACP protocol; filtered here.
107        // AcpSink::emit auto-responds with an empty string (fallback).
108        EngineEvent::AskUserRequest { .. } => None,
109
110        EngineEvent::ActionBlocked {
111            tool_name: _,
112            detail,
113            ..
114        } => {
115            let fields = acp::ToolCallUpdateFields::new()
116                .status(acp::ToolCallStatus::Failed)
117                .title(format!("Blocked: {detail}"));
118            let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
119            Some(acp::SessionNotification::new(
120                session_id.to_string(),
121                acp::SessionUpdate::ToolCallUpdate(update),
122            ))
123        }
124
125        EngineEvent::StatusUpdate { .. } => None,
126        EngineEvent::ContextUsage { .. } => None,
127        EngineEvent::Footer { .. } => None,
128        EngineEvent::SpinnerStart { .. } => None,
129        EngineEvent::SpinnerStop => None,
130        EngineEvent::TurnStart { .. } => None,
131        EngineEvent::TurnEnd { .. } => None,
132        EngineEvent::LoopCapReached { .. } => None,
133
134        EngineEvent::Info { message } => {
135            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
136            Some(acp::SessionNotification::new(
137                session_id.to_string(),
138                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
139            ))
140        }
141        EngineEvent::Warn { message } => {
142            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
143            Some(acp::SessionNotification::new(
144                session_id.to_string(),
145                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
146            ))
147        }
148        EngineEvent::Error { message } => {
149            let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
150            Some(acp::SessionNotification::new(
151                session_id.to_string(),
152                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
153            ))
154        }
155    }
156}
157
158/// Pending approval context: maps an outgoing JSON-RPC request ID back to the
159/// engine approval ID so we can route the client's response correctly.
160pub struct PendingApproval {
161    pub engine_approval_id: String,
162}
163
164/// ACP sink that translates EngineEvents to ACP messages and handles
165/// the bidirectional approval flow.
166pub struct AcpSink {
167    session_id: String,
168    tx: mpsc::Sender<AcpOutgoing>,
169    /// Kept for future bidirectional approval flow where the server reads
170    /// permission responses from stdin and routes them back to the engine.
171    #[allow(dead_code)]
172    cmd_tx: mpsc::Sender<EngineCommand>,
173    pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
174    next_rpc_id: Arc<AtomicI64>,
175}
176
177impl AcpSink {
178    pub fn new(
179        session_id: String,
180        tx: mpsc::Sender<AcpOutgoing>,
181        cmd_tx: mpsc::Sender<EngineCommand>,
182        pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
183        next_rpc_id: Arc<AtomicI64>,
184    ) -> Self {
185        Self {
186            session_id,
187            tx,
188            cmd_tx,
189            pending_approvals,
190            next_rpc_id,
191        }
192    }
193}
194
195impl EngineSink for AcpSink {
196    fn emit(&self, event: EngineEvent) {
197        // Handle approval requests specially — they become outgoing JSON-RPC requests
198        if let EngineEvent::ApprovalRequest {
199            ref id,
200            ref tool_name,
201            ref detail,
202            ..
203        } = event
204        {
205            let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
206            let rpc_id = acp::RequestId::Number(rpc_id_num);
207
208            // Build the permission request
209            let tc_fields = acp::ToolCallUpdateFields::new()
210                .status(acp::ToolCallStatus::Pending)
211                .title(detail.clone());
212            let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
213
214            let options = vec![
215                acp::PermissionOption::new(
216                    "approve",
217                    "Approve",
218                    acp::PermissionOptionKind::AllowOnce,
219                ),
220                acp::PermissionOption::new(
221                    "reject",
222                    "Reject",
223                    acp::PermissionOptionKind::RejectOnce,
224                ),
225                acp::PermissionOption::new(
226                    "always_allow",
227                    "Always Allow",
228                    acp::PermissionOptionKind::AllowAlways,
229                ),
230            ];
231
232            let request =
233                acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
234
235            // Store mapping so we can route the response back
236            self.pending_approvals.lock().unwrap().insert(
237                rpc_id.clone(),
238                PendingApproval {
239                    engine_approval_id: id.clone(),
240                },
241            );
242
243            let _ = self
244                .tx
245                .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
246            return;
247        }
248
249        // Handle loop cap — server always auto-continues
250        if matches!(event, EngineEvent::LoopCapReached { .. }) {
251            let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
252                action: koda_core::loop_guard::LoopContinuation::Continue200,
253            });
254            return;
255        }
256
257        // AskUser: no ACP protocol support yet — auto-respond with empty string.
258        if let EngineEvent::AskUserRequest { ref id, .. } = event {
259            let _ = self.cmd_tx.try_send(EngineCommand::AskUserResponse {
260                id: id.clone(),
261                answer: String::new(),
262            });
263            return;
264        }
265
266        // All other events go through the standard mapping
267        if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
268            let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
269        }
270    }
271}
272
273/// Resolve an ACP permission response to an engine approval command.
274/// Returns the `EngineCommand::ApprovalResponse` if the RPC ID matches a pending approval.
275pub fn resolve_permission_response(
276    pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
277    rpc_id: &acp::RequestId,
278    outcome: &acp::RequestPermissionOutcome,
279    cmd_tx: &mpsc::Sender<EngineCommand>,
280) -> bool {
281    let pending = pending_approvals.lock().unwrap().remove(rpc_id);
282    if let Some(approval) = pending {
283        let decision = match outcome {
284            acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
285            acp::RequestPermissionOutcome::Selected(selected) => {
286                match selected.option_id.0.as_ref() {
287                    "approve" => ApprovalDecision::Approve,
288                    _ => ApprovalDecision::Reject,
289                }
290            }
291            _ => ApprovalDecision::Reject,
292        };
293        let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
294            id: approval.engine_approval_id,
295            decision,
296        });
297        true
298    } else {
299        false
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_text_delta() {
309        let event = EngineEvent::TextDelta {
310            text: "hello".into(),
311        };
312        let acp = engine_event_to_acp(&event, "session-1").unwrap();
313
314        assert_eq!(acp.session_id, "session-1".to_string().into());
315        match acp.update {
316            acp::SessionUpdate::AgentMessageChunk(chunk) => {
317                let block = chunk.content;
318                match block {
319                    acp::ContentBlock::Text(text_content) => {
320                        assert_eq!(text_content.text, "hello");
321                    }
322                    _ => panic!("Expected text block"),
323                }
324            }
325            _ => panic!("Expected AgentMessageChunk"),
326        }
327    }
328
329    #[test]
330    fn test_thinking_delta() {
331        let event = EngineEvent::ThinkingDelta {
332            text: "reasoning...".into(),
333        };
334        let acp = engine_event_to_acp(&event, "s1").unwrap();
335        match acp.update {
336            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
337                acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
338                _ => panic!("Expected text block"),
339            },
340            _ => panic!("Expected AgentThoughtChunk"),
341        }
342    }
343
344    #[test]
345    fn test_tool_call_start() {
346        let event = EngineEvent::ToolCallStart {
347            id: "call_1".into(),
348            name: "Bash".into(),
349            args: serde_json::json!({"command": "ls"}),
350            is_sub_agent: false,
351        };
352        let acp = engine_event_to_acp(&event, "s1").unwrap();
353        match acp.update {
354            acp::SessionUpdate::ToolCall(tc) => {
355                assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
356                assert_eq!(tc.title, "Bash");
357                assert_eq!(tc.kind, acp::ToolKind::Execute);
358                assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
359            }
360            _ => panic!("Expected ToolCall"),
361        }
362    }
363
364    #[test]
365    fn test_tool_call_result() {
366        let event = EngineEvent::ToolCallResult {
367            id: "call_1".into(),
368            name: "Read".into(),
369            output: "file contents".into(),
370        };
371        let acp = engine_event_to_acp(&event, "s1").unwrap();
372        match acp.update {
373            acp::SessionUpdate::ToolCallUpdate(update) => {
374                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
375                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
376            }
377            _ => panic!("Expected ToolCallUpdate"),
378        }
379    }
380
381    #[test]
382    fn test_sub_agent_start() {
383        let event = EngineEvent::SubAgentStart {
384            agent_name: "reviewer".into(),
385        };
386        let acp = engine_event_to_acp(&event, "s1").unwrap();
387        match acp.update {
388            acp::SessionUpdate::ToolCall(tc) => {
389                assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
390                assert_eq!(tc.kind, acp::ToolKind::Other);
391            }
392            _ => panic!("Expected ToolCall"),
393        }
394    }
395
396    #[test]
397    fn test_action_blocked() {
398        let event = EngineEvent::ActionBlocked {
399            tool_name: "Bash".into(),
400            detail: "rm -rf /".into(),
401            preview: None,
402        };
403        let acp = engine_event_to_acp(&event, "s1").unwrap();
404        match acp.update {
405            acp::SessionUpdate::ToolCallUpdate(update) => {
406                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
407                assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
408            }
409            _ => panic!("Expected ToolCallUpdate"),
410        }
411    }
412
413    #[test]
414    fn test_info_warn_error() {
415        for (event, prefix) in [
416            (
417                EngineEvent::Info {
418                    message: "hello".into(),
419                },
420                "[info]",
421            ),
422            (
423                EngineEvent::Warn {
424                    message: "watch out".into(),
425                },
426                "[warn]",
427            ),
428            (
429                EngineEvent::Error {
430                    message: "oops".into(),
431                },
432                "[error]",
433            ),
434        ] {
435            let acp = engine_event_to_acp(&event, "s1").unwrap();
436            match acp.update {
437                acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
438                    acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
439                    _ => panic!("Expected text block"),
440                },
441                _ => panic!("Expected AgentMessageChunk"),
442            }
443        }
444    }
445
446    #[test]
447    fn test_none_events() {
448        let none_events = vec![
449            EngineEvent::TextDone,
450            EngineEvent::ThinkingStart,
451            EngineEvent::ThinkingDone,
452            EngineEvent::ResponseStart,
453            EngineEvent::ApprovalRequest {
454                id: "a".into(),
455                tool_name: "Bash".into(),
456                detail: "cmd".into(),
457                preview: None,
458            },
459            EngineEvent::AskUserRequest {
460                id: "b".into(),
461                question: "Which db?".into(),
462                options: vec![],
463            },
464            EngineEvent::StatusUpdate {
465                model: "m".into(),
466                provider: "p".into(),
467                context_pct: 0.5,
468                approval_mode: "normal".into(),
469                active_tools: 0,
470            },
471            EngineEvent::Footer {
472                prompt_tokens: 0,
473                completion_tokens: 0,
474                cache_read_tokens: 0,
475                thinking_tokens: 0,
476                total_chars: 0,
477                elapsed_ms: 0,
478                rate: 0.0,
479                context: String::new(),
480            },
481            EngineEvent::SpinnerStart {
482                message: "x".into(),
483            },
484            EngineEvent::SpinnerStop,
485            EngineEvent::TurnStart {
486                turn_id: "t1".into(),
487            },
488            EngineEvent::TurnEnd {
489                turn_id: "t1".into(),
490                reason: koda_core::engine::event::TurnEndReason::Complete,
491            },
492            EngineEvent::LoopCapReached {
493                cap: 200,
494                recent_tools: vec![],
495            },
496        ];
497        for event in none_events {
498            assert!(
499                engine_event_to_acp(&event, "s1").is_none(),
500                "Expected None for {event:?}"
501            );
502        }
503    }
504
505    #[test]
506    fn test_map_tool_kind() {
507        assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
508        assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
509        assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
510        assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
511        assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
512        assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
513        assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
514        assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
515        assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
516        assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
517    }
518}