1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13 Notification(acp::SessionNotification),
14 PermissionRequest {
15 rpc_id: acp::RequestId,
16 request: acp::RequestPermissionRequest,
17 },
18}
19
20pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22 match name {
23 "Read" => acp::ToolKind::Read,
24 "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25 "Bash" | "Shell" => acp::ToolKind::Execute,
26 "Grep" | "Glob" => acp::ToolKind::Search,
27 "Delete" => acp::ToolKind::Delete,
28 "WebFetch" => acp::ToolKind::Fetch,
29 "Think" => acp::ToolKind::Think,
30 _ => acp::ToolKind::Other,
31 }
32}
33
34pub fn engine_event_to_acp(
39 event: &EngineEvent,
40 session_id: &str,
41) -> Option<acp::SessionNotification> {
42 match event {
43 EngineEvent::TextDelta { text } => {
44 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45 Some(acp::SessionNotification::new(
46 session_id.to_string(),
47 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48 ))
49 }
50 EngineEvent::TextDone => None,
51 EngineEvent::ThinkingStart => None,
52 EngineEvent::ThinkingDelta { text } => {
53 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54 Some(acp::SessionNotification::new(
55 session_id.to_string(),
56 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57 ))
58 }
59 EngineEvent::ThinkingDone => None,
60 EngineEvent::ResponseStart => None,
61
62 EngineEvent::ToolCallStart { id, name, args, .. } => {
63 let tc = acp::ToolCall::new(id.clone(), name.clone())
64 .kind(map_tool_kind(name))
65 .status(acp::ToolCallStatus::InProgress)
66 .raw_input(Some(args.clone()));
67 Some(acp::SessionNotification::new(
68 session_id.to_string(),
69 acp::SessionUpdate::ToolCall(tc),
70 ))
71 }
72
73 EngineEvent::ToolCallResult {
74 id,
75 name: _,
76 output,
77 } => {
78 let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79 acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80 ))];
81 let fields = acp::ToolCallUpdateFields::new()
82 .status(acp::ToolCallStatus::Completed)
83 .content(content);
84 let update = acp::ToolCallUpdate::new(id.clone(), fields);
85 Some(acp::SessionNotification::new(
86 session_id.to_string(),
87 acp::SessionUpdate::ToolCallUpdate(update),
88 ))
89 }
90
91 EngineEvent::SubAgentStart { agent_name } => {
92 let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93 .kind(acp::ToolKind::Other)
94 .status(acp::ToolCallStatus::InProgress);
95 Some(acp::SessionNotification::new(
96 session_id.to_string(),
97 acp::SessionUpdate::ToolCall(tc),
98 ))
99 }
100
101 EngineEvent::SubAgentEnd { agent_name } => {
102 let fields = acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed);
103 let update = acp::ToolCallUpdate::new(agent_name.clone(), fields);
104 Some(acp::SessionNotification::new(
105 session_id.to_string(),
106 acp::SessionUpdate::ToolCallUpdate(update),
107 ))
108 }
109
110 EngineEvent::ApprovalRequest { .. } => None,
112
113 EngineEvent::ActionBlocked {
114 tool_name: _,
115 detail,
116 ..
117 } => {
118 let fields = acp::ToolCallUpdateFields::new()
119 .status(acp::ToolCallStatus::Failed)
120 .title(format!("Blocked: {detail}"));
121 let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
122 Some(acp::SessionNotification::new(
123 session_id.to_string(),
124 acp::SessionUpdate::ToolCallUpdate(update),
125 ))
126 }
127
128 EngineEvent::StatusUpdate { .. } => None,
129 EngineEvent::Footer { .. } => None,
130 EngineEvent::SpinnerStart { .. } => None,
131 EngineEvent::SpinnerStop => None,
132 EngineEvent::TodoDisplay { .. } => None,
133 EngineEvent::TurnStart { .. } => None,
134 EngineEvent::TurnEnd { .. } => None,
135 EngineEvent::LoopCapReached { .. } => None,
136
137 EngineEvent::Info { message } => {
138 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
139 Some(acp::SessionNotification::new(
140 session_id.to_string(),
141 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
142 ))
143 }
144 EngineEvent::Warn { message } => {
145 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
146 Some(acp::SessionNotification::new(
147 session_id.to_string(),
148 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
149 ))
150 }
151 EngineEvent::Error { message } => {
152 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
153 Some(acp::SessionNotification::new(
154 session_id.to_string(),
155 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
156 ))
157 }
158 }
159}
160
161pub struct PendingApproval {
164 pub engine_approval_id: String,
165}
166
167pub struct AcpSink {
170 session_id: String,
171 tx: mpsc::Sender<AcpOutgoing>,
172 #[allow(dead_code)]
175 cmd_tx: mpsc::Sender<EngineCommand>,
176 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
177 next_rpc_id: Arc<AtomicI64>,
178}
179
180impl AcpSink {
181 pub fn new(
182 session_id: String,
183 tx: mpsc::Sender<AcpOutgoing>,
184 cmd_tx: mpsc::Sender<EngineCommand>,
185 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
186 next_rpc_id: Arc<AtomicI64>,
187 ) -> Self {
188 Self {
189 session_id,
190 tx,
191 cmd_tx,
192 pending_approvals,
193 next_rpc_id,
194 }
195 }
196}
197
198impl EngineSink for AcpSink {
199 fn emit(&self, event: EngineEvent) {
200 if let EngineEvent::ApprovalRequest {
202 ref id,
203 ref tool_name,
204 ref detail,
205 ..
206 } = event
207 {
208 let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
209 let rpc_id = acp::RequestId::Number(rpc_id_num);
210
211 let tc_fields = acp::ToolCallUpdateFields::new()
213 .status(acp::ToolCallStatus::Pending)
214 .title(detail.clone());
215 let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
216
217 let options = vec![
218 acp::PermissionOption::new(
219 "approve",
220 "Approve",
221 acp::PermissionOptionKind::AllowOnce,
222 ),
223 acp::PermissionOption::new(
224 "reject",
225 "Reject",
226 acp::PermissionOptionKind::RejectOnce,
227 ),
228 acp::PermissionOption::new(
229 "always_allow",
230 "Always Allow",
231 acp::PermissionOptionKind::AllowAlways,
232 ),
233 ];
234
235 let request =
236 acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
237
238 self.pending_approvals.lock().unwrap().insert(
240 rpc_id.clone(),
241 PendingApproval {
242 engine_approval_id: id.clone(),
243 },
244 );
245
246 let _ = self
247 .tx
248 .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
249 return;
250 }
251
252 if matches!(event, EngineEvent::LoopCapReached { .. }) {
254 let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
255 action: koda_core::loop_guard::LoopContinuation::Continue200,
256 });
257 return;
258 }
259
260 if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
262 let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
263 }
264 }
265}
266
267pub fn resolve_permission_response(
270 pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
271 rpc_id: &acp::RequestId,
272 outcome: &acp::RequestPermissionOutcome,
273 cmd_tx: &mpsc::Sender<EngineCommand>,
274) -> bool {
275 let pending = pending_approvals.lock().unwrap().remove(rpc_id);
276 if let Some(approval) = pending {
277 let decision = match outcome {
278 acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
279 acp::RequestPermissionOutcome::Selected(selected) => {
280 match selected.option_id.0.as_ref() {
281 "approve" => ApprovalDecision::Approve,
282 "always_allow" => ApprovalDecision::AlwaysAllow,
283 _ => ApprovalDecision::Reject,
284 }
285 }
286 _ => ApprovalDecision::Reject,
287 };
288 let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
289 id: approval.engine_approval_id,
290 decision,
291 });
292 true
293 } else {
294 false
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn test_text_delta() {
304 let event = EngineEvent::TextDelta {
305 text: "hello".into(),
306 };
307 let acp = engine_event_to_acp(&event, "session-1").unwrap();
308
309 assert_eq!(acp.session_id, "session-1".to_string().into());
310 match acp.update {
311 acp::SessionUpdate::AgentMessageChunk(chunk) => {
312 let block = chunk.content;
313 match block {
314 acp::ContentBlock::Text(text_content) => {
315 assert_eq!(text_content.text, "hello");
316 }
317 _ => panic!("Expected text block"),
318 }
319 }
320 _ => panic!("Expected AgentMessageChunk"),
321 }
322 }
323
324 #[test]
325 fn test_thinking_delta() {
326 let event = EngineEvent::ThinkingDelta {
327 text: "reasoning...".into(),
328 };
329 let acp = engine_event_to_acp(&event, "s1").unwrap();
330 match acp.update {
331 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
332 acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
333 _ => panic!("Expected text block"),
334 },
335 _ => panic!("Expected AgentThoughtChunk"),
336 }
337 }
338
339 #[test]
340 fn test_tool_call_start() {
341 let event = EngineEvent::ToolCallStart {
342 id: "call_1".into(),
343 name: "Bash".into(),
344 args: serde_json::json!({"command": "ls"}),
345 is_sub_agent: false,
346 };
347 let acp = engine_event_to_acp(&event, "s1").unwrap();
348 match acp.update {
349 acp::SessionUpdate::ToolCall(tc) => {
350 assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
351 assert_eq!(tc.title, "Bash");
352 assert_eq!(tc.kind, acp::ToolKind::Execute);
353 assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
354 }
355 _ => panic!("Expected ToolCall"),
356 }
357 }
358
359 #[test]
360 fn test_tool_call_result() {
361 let event = EngineEvent::ToolCallResult {
362 id: "call_1".into(),
363 name: "Read".into(),
364 output: "file contents".into(),
365 };
366 let acp = engine_event_to_acp(&event, "s1").unwrap();
367 match acp.update {
368 acp::SessionUpdate::ToolCallUpdate(update) => {
369 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
370 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
371 }
372 _ => panic!("Expected ToolCallUpdate"),
373 }
374 }
375
376 #[test]
377 fn test_sub_agent_start() {
378 let event = EngineEvent::SubAgentStart {
379 agent_name: "reviewer".into(),
380 };
381 let acp = engine_event_to_acp(&event, "s1").unwrap();
382 match acp.update {
383 acp::SessionUpdate::ToolCall(tc) => {
384 assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
385 assert_eq!(tc.kind, acp::ToolKind::Other);
386 }
387 _ => panic!("Expected ToolCall"),
388 }
389 }
390
391 #[test]
392 fn test_sub_agent_end() {
393 let event = EngineEvent::SubAgentEnd {
394 agent_name: "reviewer".into(),
395 };
396 let acp = engine_event_to_acp(&event, "s1").unwrap();
397 match acp.update {
398 acp::SessionUpdate::ToolCallUpdate(update) => {
399 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
400 }
401 _ => panic!("Expected ToolCallUpdate"),
402 }
403 }
404
405 #[test]
406 fn test_action_blocked() {
407 let event = EngineEvent::ActionBlocked {
408 tool_name: "Bash".into(),
409 detail: "rm -rf /".into(),
410 preview: None,
411 };
412 let acp = engine_event_to_acp(&event, "s1").unwrap();
413 match acp.update {
414 acp::SessionUpdate::ToolCallUpdate(update) => {
415 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
416 assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
417 }
418 _ => panic!("Expected ToolCallUpdate"),
419 }
420 }
421
422 #[test]
423 fn test_info_warn_error() {
424 for (event, prefix) in [
425 (
426 EngineEvent::Info {
427 message: "hello".into(),
428 },
429 "[info]",
430 ),
431 (
432 EngineEvent::Warn {
433 message: "watch out".into(),
434 },
435 "[warn]",
436 ),
437 (
438 EngineEvent::Error {
439 message: "oops".into(),
440 },
441 "[error]",
442 ),
443 ] {
444 let acp = engine_event_to_acp(&event, "s1").unwrap();
445 match acp.update {
446 acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
447 acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
448 _ => panic!("Expected text block"),
449 },
450 _ => panic!("Expected AgentMessageChunk"),
451 }
452 }
453 }
454
455 #[test]
456 fn test_none_events() {
457 let none_events = vec![
458 EngineEvent::TextDone,
459 EngineEvent::ThinkingStart,
460 EngineEvent::ThinkingDone,
461 EngineEvent::ResponseStart,
462 EngineEvent::ApprovalRequest {
463 id: "a".into(),
464 tool_name: "Bash".into(),
465 detail: "cmd".into(),
466 preview: None,
467 whitelist_hint: None,
468 },
469 EngineEvent::StatusUpdate {
470 model: "m".into(),
471 provider: "p".into(),
472 context_pct: 0.5,
473 approval_mode: "normal".into(),
474 active_tools: 0,
475 },
476 EngineEvent::Footer {
477 prompt_tokens: 0,
478 completion_tokens: 0,
479 cache_read_tokens: 0,
480 thinking_tokens: 0,
481 total_chars: 0,
482 elapsed_ms: 0,
483 rate: 0.0,
484 context: String::new(),
485 },
486 EngineEvent::SpinnerStart {
487 message: "x".into(),
488 },
489 EngineEvent::SpinnerStop,
490 EngineEvent::TurnStart {
491 turn_id: "t1".into(),
492 },
493 EngineEvent::TurnEnd {
494 turn_id: "t1".into(),
495 reason: koda_core::engine::event::TurnEndReason::Complete,
496 },
497 EngineEvent::LoopCapReached {
498 cap: 200,
499 recent_tools: vec![],
500 },
501 ];
502 for event in none_events {
503 assert!(
504 engine_event_to_acp(&event, "s1").is_none(),
505 "Expected None for {event:?}"
506 );
507 }
508 }
509
510 #[test]
511 fn test_map_tool_kind() {
512 assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
513 assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
514 assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
515 assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
516 assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
517 assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
518 assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
519 assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
520 assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
521 assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
522 }
523}