1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13 Notification(acp::SessionNotification),
14 PermissionRequest {
15 rpc_id: acp::RequestId,
16 request: acp::RequestPermissionRequest,
17 },
18}
19
20pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22 match name {
23 "Read" => acp::ToolKind::Read,
24 "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25 "Bash" | "Shell" => acp::ToolKind::Execute,
26 "Grep" | "Glob" => acp::ToolKind::Search,
27 "Delete" => acp::ToolKind::Delete,
28 "WebFetch" => acp::ToolKind::Fetch,
29 "Think" => acp::ToolKind::Think,
30 _ => acp::ToolKind::Other,
31 }
32}
33
34pub fn engine_event_to_acp(
39 event: &EngineEvent,
40 session_id: &str,
41) -> Option<acp::SessionNotification> {
42 match event {
43 EngineEvent::TextDelta { text } => {
44 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45 Some(acp::SessionNotification::new(
46 session_id.to_string(),
47 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48 ))
49 }
50 EngineEvent::TextDone => None,
51 EngineEvent::ThinkingStart => None,
52 EngineEvent::ThinkingDelta { text } => {
53 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54 Some(acp::SessionNotification::new(
55 session_id.to_string(),
56 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57 ))
58 }
59 EngineEvent::ThinkingDone => None,
60 EngineEvent::ResponseStart => None,
61
62 EngineEvent::ToolCallStart { id, name, args, .. } => {
63 let tc = acp::ToolCall::new(id.clone(), name.clone())
64 .kind(map_tool_kind(name))
65 .status(acp::ToolCallStatus::InProgress)
66 .raw_input(Some(args.clone()));
67 Some(acp::SessionNotification::new(
68 session_id.to_string(),
69 acp::SessionUpdate::ToolCall(tc),
70 ))
71 }
72
73 EngineEvent::ToolCallResult {
74 id,
75 name: _,
76 output,
77 } => {
78 let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79 acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80 ))];
81 let fields = acp::ToolCallUpdateFields::new()
82 .status(acp::ToolCallStatus::Completed)
83 .content(content);
84 let update = acp::ToolCallUpdate::new(id.clone(), fields);
85 Some(acp::SessionNotification::new(
86 session_id.to_string(),
87 acp::SessionUpdate::ToolCallUpdate(update),
88 ))
89 }
90
91 EngineEvent::SubAgentStart { agent_name } => {
92 let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93 .kind(acp::ToolKind::Other)
94 .status(acp::ToolCallStatus::InProgress);
95 Some(acp::SessionNotification::new(
96 session_id.to_string(),
97 acp::SessionUpdate::ToolCall(tc),
98 ))
99 }
100
101 EngineEvent::ApprovalRequest { .. } => None,
103
104 EngineEvent::ActionBlocked {
105 tool_name: _,
106 detail,
107 ..
108 } => {
109 let fields = acp::ToolCallUpdateFields::new()
110 .status(acp::ToolCallStatus::Failed)
111 .title(format!("Blocked: {detail}"));
112 let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
113 Some(acp::SessionNotification::new(
114 session_id.to_string(),
115 acp::SessionUpdate::ToolCallUpdate(update),
116 ))
117 }
118
119 EngineEvent::StatusUpdate { .. } => None,
120 EngineEvent::Footer { .. } => None,
121 EngineEvent::SpinnerStart { .. } => None,
122 EngineEvent::SpinnerStop => None,
123 EngineEvent::TurnStart { .. } => None,
124 EngineEvent::TurnEnd { .. } => None,
125 EngineEvent::LoopCapReached { .. } => None,
126
127 EngineEvent::Info { message } => {
128 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
129 Some(acp::SessionNotification::new(
130 session_id.to_string(),
131 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
132 ))
133 }
134 EngineEvent::Warn { message } => {
135 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
136 Some(acp::SessionNotification::new(
137 session_id.to_string(),
138 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
139 ))
140 }
141 EngineEvent::Error { message } => {
142 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
143 Some(acp::SessionNotification::new(
144 session_id.to_string(),
145 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
146 ))
147 }
148 }
149}
150
151pub struct PendingApproval {
154 pub engine_approval_id: String,
155}
156
157pub struct AcpSink {
160 session_id: String,
161 tx: mpsc::Sender<AcpOutgoing>,
162 #[allow(dead_code)]
165 cmd_tx: mpsc::Sender<EngineCommand>,
166 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
167 next_rpc_id: Arc<AtomicI64>,
168}
169
170impl AcpSink {
171 pub fn new(
172 session_id: String,
173 tx: mpsc::Sender<AcpOutgoing>,
174 cmd_tx: mpsc::Sender<EngineCommand>,
175 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
176 next_rpc_id: Arc<AtomicI64>,
177 ) -> Self {
178 Self {
179 session_id,
180 tx,
181 cmd_tx,
182 pending_approvals,
183 next_rpc_id,
184 }
185 }
186}
187
188impl EngineSink for AcpSink {
189 fn emit(&self, event: EngineEvent) {
190 if let EngineEvent::ApprovalRequest {
192 ref id,
193 ref tool_name,
194 ref detail,
195 ..
196 } = event
197 {
198 let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
199 let rpc_id = acp::RequestId::Number(rpc_id_num);
200
201 let tc_fields = acp::ToolCallUpdateFields::new()
203 .status(acp::ToolCallStatus::Pending)
204 .title(detail.clone());
205 let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
206
207 let options = vec![
208 acp::PermissionOption::new(
209 "approve",
210 "Approve",
211 acp::PermissionOptionKind::AllowOnce,
212 ),
213 acp::PermissionOption::new(
214 "reject",
215 "Reject",
216 acp::PermissionOptionKind::RejectOnce,
217 ),
218 acp::PermissionOption::new(
219 "always_allow",
220 "Always Allow",
221 acp::PermissionOptionKind::AllowAlways,
222 ),
223 ];
224
225 let request =
226 acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
227
228 self.pending_approvals.lock().unwrap().insert(
230 rpc_id.clone(),
231 PendingApproval {
232 engine_approval_id: id.clone(),
233 },
234 );
235
236 let _ = self
237 .tx
238 .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
239 return;
240 }
241
242 if matches!(event, EngineEvent::LoopCapReached { .. }) {
244 let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
245 action: koda_core::loop_guard::LoopContinuation::Continue200,
246 });
247 return;
248 }
249
250 if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
252 let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
253 }
254 }
255}
256
257pub fn resolve_permission_response(
260 pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
261 rpc_id: &acp::RequestId,
262 outcome: &acp::RequestPermissionOutcome,
263 cmd_tx: &mpsc::Sender<EngineCommand>,
264) -> bool {
265 let pending = pending_approvals.lock().unwrap().remove(rpc_id);
266 if let Some(approval) = pending {
267 let decision = match outcome {
268 acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
269 acp::RequestPermissionOutcome::Selected(selected) => {
270 match selected.option_id.0.as_ref() {
271 "approve" => ApprovalDecision::Approve,
272 _ => ApprovalDecision::Reject,
273 }
274 }
275 _ => ApprovalDecision::Reject,
276 };
277 let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
278 id: approval.engine_approval_id,
279 decision,
280 });
281 true
282 } else {
283 false
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_text_delta() {
293 let event = EngineEvent::TextDelta {
294 text: "hello".into(),
295 };
296 let acp = engine_event_to_acp(&event, "session-1").unwrap();
297
298 assert_eq!(acp.session_id, "session-1".to_string().into());
299 match acp.update {
300 acp::SessionUpdate::AgentMessageChunk(chunk) => {
301 let block = chunk.content;
302 match block {
303 acp::ContentBlock::Text(text_content) => {
304 assert_eq!(text_content.text, "hello");
305 }
306 _ => panic!("Expected text block"),
307 }
308 }
309 _ => panic!("Expected AgentMessageChunk"),
310 }
311 }
312
313 #[test]
314 fn test_thinking_delta() {
315 let event = EngineEvent::ThinkingDelta {
316 text: "reasoning...".into(),
317 };
318 let acp = engine_event_to_acp(&event, "s1").unwrap();
319 match acp.update {
320 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
321 acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
322 _ => panic!("Expected text block"),
323 },
324 _ => panic!("Expected AgentThoughtChunk"),
325 }
326 }
327
328 #[test]
329 fn test_tool_call_start() {
330 let event = EngineEvent::ToolCallStart {
331 id: "call_1".into(),
332 name: "Bash".into(),
333 args: serde_json::json!({"command": "ls"}),
334 is_sub_agent: false,
335 };
336 let acp = engine_event_to_acp(&event, "s1").unwrap();
337 match acp.update {
338 acp::SessionUpdate::ToolCall(tc) => {
339 assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
340 assert_eq!(tc.title, "Bash");
341 assert_eq!(tc.kind, acp::ToolKind::Execute);
342 assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
343 }
344 _ => panic!("Expected ToolCall"),
345 }
346 }
347
348 #[test]
349 fn test_tool_call_result() {
350 let event = EngineEvent::ToolCallResult {
351 id: "call_1".into(),
352 name: "Read".into(),
353 output: "file contents".into(),
354 };
355 let acp = engine_event_to_acp(&event, "s1").unwrap();
356 match acp.update {
357 acp::SessionUpdate::ToolCallUpdate(update) => {
358 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
359 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
360 }
361 _ => panic!("Expected ToolCallUpdate"),
362 }
363 }
364
365 #[test]
366 fn test_sub_agent_start() {
367 let event = EngineEvent::SubAgentStart {
368 agent_name: "reviewer".into(),
369 };
370 let acp = engine_event_to_acp(&event, "s1").unwrap();
371 match acp.update {
372 acp::SessionUpdate::ToolCall(tc) => {
373 assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
374 assert_eq!(tc.kind, acp::ToolKind::Other);
375 }
376 _ => panic!("Expected ToolCall"),
377 }
378 }
379
380 #[test]
381 fn test_action_blocked() {
382 let event = EngineEvent::ActionBlocked {
383 tool_name: "Bash".into(),
384 detail: "rm -rf /".into(),
385 preview: None,
386 };
387 let acp = engine_event_to_acp(&event, "s1").unwrap();
388 match acp.update {
389 acp::SessionUpdate::ToolCallUpdate(update) => {
390 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
391 assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
392 }
393 _ => panic!("Expected ToolCallUpdate"),
394 }
395 }
396
397 #[test]
398 fn test_info_warn_error() {
399 for (event, prefix) in [
400 (
401 EngineEvent::Info {
402 message: "hello".into(),
403 },
404 "[info]",
405 ),
406 (
407 EngineEvent::Warn {
408 message: "watch out".into(),
409 },
410 "[warn]",
411 ),
412 (
413 EngineEvent::Error {
414 message: "oops".into(),
415 },
416 "[error]",
417 ),
418 ] {
419 let acp = engine_event_to_acp(&event, "s1").unwrap();
420 match acp.update {
421 acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
422 acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
423 _ => panic!("Expected text block"),
424 },
425 _ => panic!("Expected AgentMessageChunk"),
426 }
427 }
428 }
429
430 #[test]
431 fn test_none_events() {
432 let none_events = vec![
433 EngineEvent::TextDone,
434 EngineEvent::ThinkingStart,
435 EngineEvent::ThinkingDone,
436 EngineEvent::ResponseStart,
437 EngineEvent::ApprovalRequest {
438 id: "a".into(),
439 tool_name: "Bash".into(),
440 detail: "cmd".into(),
441 preview: None,
442 },
443 EngineEvent::StatusUpdate {
444 model: "m".into(),
445 provider: "p".into(),
446 context_pct: 0.5,
447 approval_mode: "normal".into(),
448 active_tools: 0,
449 },
450 EngineEvent::Footer {
451 prompt_tokens: 0,
452 completion_tokens: 0,
453 cache_read_tokens: 0,
454 thinking_tokens: 0,
455 total_chars: 0,
456 elapsed_ms: 0,
457 rate: 0.0,
458 context: String::new(),
459 },
460 EngineEvent::SpinnerStart {
461 message: "x".into(),
462 },
463 EngineEvent::SpinnerStop,
464 EngineEvent::TurnStart {
465 turn_id: "t1".into(),
466 },
467 EngineEvent::TurnEnd {
468 turn_id: "t1".into(),
469 reason: koda_core::engine::event::TurnEndReason::Complete,
470 },
471 EngineEvent::LoopCapReached {
472 cap: 200,
473 recent_tools: vec![],
474 },
475 ];
476 for event in none_events {
477 assert!(
478 engine_event_to_acp(&event, "s1").is_none(),
479 "Expected None for {event:?}"
480 );
481 }
482 }
483
484 #[test]
485 fn test_map_tool_kind() {
486 assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
487 assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
488 assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
489 assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
490 assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
491 assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
492 assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
493 assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
494 assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
495 assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
496 }
497}