1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13 Notification(acp::SessionNotification),
14 PermissionRequest {
15 rpc_id: acp::RequestId,
16 request: acp::RequestPermissionRequest,
17 },
18}
19
20pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22 match name {
23 "Read" => acp::ToolKind::Read,
24 "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25 "Bash" | "Shell" => acp::ToolKind::Execute,
26 "Grep" | "Glob" => acp::ToolKind::Search,
27 "Delete" => acp::ToolKind::Delete,
28 "WebFetch" => acp::ToolKind::Fetch,
29 "Think" => acp::ToolKind::Think,
30 _ => acp::ToolKind::Other,
31 }
32}
33
34pub fn engine_event_to_acp(
39 event: &EngineEvent,
40 session_id: &str,
41) -> Option<acp::SessionNotification> {
42 match event {
43 EngineEvent::TextDelta { text } => {
44 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45 Some(acp::SessionNotification::new(
46 session_id.to_string(),
47 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48 ))
49 }
50 EngineEvent::TextDone => None,
51 EngineEvent::ThinkingStart => None,
52 EngineEvent::ThinkingDelta { text } => {
53 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54 Some(acp::SessionNotification::new(
55 session_id.to_string(),
56 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57 ))
58 }
59 EngineEvent::ThinkingDone => None,
60 EngineEvent::ResponseStart => None,
61
62 EngineEvent::ToolCallStart { id, name, args, .. } => {
63 let tc = acp::ToolCall::new(id.clone(), name.clone())
64 .kind(map_tool_kind(name))
65 .status(acp::ToolCallStatus::InProgress)
66 .raw_input(Some(args.clone()));
67 Some(acp::SessionNotification::new(
68 session_id.to_string(),
69 acp::SessionUpdate::ToolCall(tc),
70 ))
71 }
72
73 EngineEvent::ToolOutputLine { .. } => None,
75
76 EngineEvent::ToolCallResult {
77 id,
78 name: _,
79 output,
80 } => {
81 let content = vec![acp::ToolCallContent::Content(acp::Content::new(
82 acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
83 ))];
84 let fields = acp::ToolCallUpdateFields::new()
85 .status(acp::ToolCallStatus::Completed)
86 .content(content);
87 let update = acp::ToolCallUpdate::new(id.clone(), fields);
88 Some(acp::SessionNotification::new(
89 session_id.to_string(),
90 acp::SessionUpdate::ToolCallUpdate(update),
91 ))
92 }
93
94 EngineEvent::SubAgentStart { agent_name } => {
95 let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
96 .kind(acp::ToolKind::Other)
97 .status(acp::ToolCallStatus::InProgress);
98 Some(acp::SessionNotification::new(
99 session_id.to_string(),
100 acp::SessionUpdate::ToolCall(tc),
101 ))
102 }
103
104 EngineEvent::ApprovalRequest { .. } => None,
106 EngineEvent::AskUserRequest { .. } => None,
109
110 EngineEvent::ActionBlocked {
111 tool_name: _,
112 detail,
113 ..
114 } => {
115 let fields = acp::ToolCallUpdateFields::new()
116 .status(acp::ToolCallStatus::Failed)
117 .title(format!("Blocked: {detail}"));
118 let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
119 Some(acp::SessionNotification::new(
120 session_id.to_string(),
121 acp::SessionUpdate::ToolCallUpdate(update),
122 ))
123 }
124
125 EngineEvent::StatusUpdate { .. } => None,
126 EngineEvent::ContextUsage { .. } => None,
127 EngineEvent::Footer { .. } => None,
128 EngineEvent::SpinnerStart { .. } => None,
129 EngineEvent::SpinnerStop => None,
130 EngineEvent::TurnStart { .. } => None,
131 EngineEvent::TurnEnd { .. } => None,
132 EngineEvent::LoopCapReached { .. } => None,
133
134 EngineEvent::Info { message } => {
135 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
136 Some(acp::SessionNotification::new(
137 session_id.to_string(),
138 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
139 ))
140 }
141 EngineEvent::Warn { message } => {
142 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
143 Some(acp::SessionNotification::new(
144 session_id.to_string(),
145 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
146 ))
147 }
148 EngineEvent::Error { message } => {
149 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
150 Some(acp::SessionNotification::new(
151 session_id.to_string(),
152 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
153 ))
154 }
155 }
156}
157
158pub struct PendingApproval {
161 pub engine_approval_id: String,
162}
163
164pub struct AcpSink {
167 session_id: String,
168 tx: mpsc::Sender<AcpOutgoing>,
169 #[allow(dead_code)]
172 cmd_tx: mpsc::Sender<EngineCommand>,
173 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
174 next_rpc_id: Arc<AtomicI64>,
175}
176
177impl AcpSink {
178 pub fn new(
179 session_id: String,
180 tx: mpsc::Sender<AcpOutgoing>,
181 cmd_tx: mpsc::Sender<EngineCommand>,
182 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
183 next_rpc_id: Arc<AtomicI64>,
184 ) -> Self {
185 Self {
186 session_id,
187 tx,
188 cmd_tx,
189 pending_approvals,
190 next_rpc_id,
191 }
192 }
193}
194
195impl EngineSink for AcpSink {
196 fn emit(&self, event: EngineEvent) {
197 if let EngineEvent::ApprovalRequest {
199 ref id,
200 ref tool_name,
201 ref detail,
202 ..
203 } = event
204 {
205 let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
206 let rpc_id = acp::RequestId::Number(rpc_id_num);
207
208 let tc_fields = acp::ToolCallUpdateFields::new()
210 .status(acp::ToolCallStatus::Pending)
211 .title(detail.clone());
212 let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
213
214 let options = vec![
215 acp::PermissionOption::new(
216 "approve",
217 "Approve",
218 acp::PermissionOptionKind::AllowOnce,
219 ),
220 acp::PermissionOption::new(
221 "reject",
222 "Reject",
223 acp::PermissionOptionKind::RejectOnce,
224 ),
225 acp::PermissionOption::new(
226 "always_allow",
227 "Always Allow",
228 acp::PermissionOptionKind::AllowAlways,
229 ),
230 ];
231
232 let request =
233 acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
234
235 self.pending_approvals.lock().unwrap().insert(
237 rpc_id.clone(),
238 PendingApproval {
239 engine_approval_id: id.clone(),
240 },
241 );
242
243 let _ = self
244 .tx
245 .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
246 return;
247 }
248
249 if matches!(event, EngineEvent::LoopCapReached { .. }) {
251 let _ = self.cmd_tx.try_send(EngineCommand::LoopDecision {
252 action: koda_core::loop_guard::LoopContinuation::Continue200,
253 });
254 return;
255 }
256
257 if let EngineEvent::AskUserRequest { ref id, .. } = event {
259 let _ = self.cmd_tx.try_send(EngineCommand::AskUserResponse {
260 id: id.clone(),
261 answer: String::new(),
262 });
263 return;
264 }
265
266 if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
268 let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
269 }
270 }
271}
272
273pub fn resolve_permission_response(
276 pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
277 rpc_id: &acp::RequestId,
278 outcome: &acp::RequestPermissionOutcome,
279 cmd_tx: &mpsc::Sender<EngineCommand>,
280) -> bool {
281 let pending = pending_approvals.lock().unwrap().remove(rpc_id);
282 if let Some(approval) = pending {
283 let decision = match outcome {
284 acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
285 acp::RequestPermissionOutcome::Selected(selected) => {
286 match selected.option_id.0.as_ref() {
287 "approve" => ApprovalDecision::Approve,
288 _ => ApprovalDecision::Reject,
289 }
290 }
291 _ => ApprovalDecision::Reject,
292 };
293 let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
294 id: approval.engine_approval_id,
295 decision,
296 });
297 true
298 } else {
299 false
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_text_delta() {
309 let event = EngineEvent::TextDelta {
310 text: "hello".into(),
311 };
312 let acp = engine_event_to_acp(&event, "session-1").unwrap();
313
314 assert_eq!(acp.session_id, "session-1".to_string().into());
315 match acp.update {
316 acp::SessionUpdate::AgentMessageChunk(chunk) => {
317 let block = chunk.content;
318 match block {
319 acp::ContentBlock::Text(text_content) => {
320 assert_eq!(text_content.text, "hello");
321 }
322 _ => panic!("Expected text block"),
323 }
324 }
325 _ => panic!("Expected AgentMessageChunk"),
326 }
327 }
328
329 #[test]
330 fn test_thinking_delta() {
331 let event = EngineEvent::ThinkingDelta {
332 text: "reasoning...".into(),
333 };
334 let acp = engine_event_to_acp(&event, "s1").unwrap();
335 match acp.update {
336 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
337 acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
338 _ => panic!("Expected text block"),
339 },
340 _ => panic!("Expected AgentThoughtChunk"),
341 }
342 }
343
344 #[test]
345 fn test_tool_call_start() {
346 let event = EngineEvent::ToolCallStart {
347 id: "call_1".into(),
348 name: "Bash".into(),
349 args: serde_json::json!({"command": "ls"}),
350 is_sub_agent: false,
351 };
352 let acp = engine_event_to_acp(&event, "s1").unwrap();
353 match acp.update {
354 acp::SessionUpdate::ToolCall(tc) => {
355 assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
356 assert_eq!(tc.title, "Bash");
357 assert_eq!(tc.kind, acp::ToolKind::Execute);
358 assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
359 }
360 _ => panic!("Expected ToolCall"),
361 }
362 }
363
364 #[test]
365 fn test_tool_call_result() {
366 let event = EngineEvent::ToolCallResult {
367 id: "call_1".into(),
368 name: "Read".into(),
369 output: "file contents".into(),
370 };
371 let acp = engine_event_to_acp(&event, "s1").unwrap();
372 match acp.update {
373 acp::SessionUpdate::ToolCallUpdate(update) => {
374 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
375 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
376 }
377 _ => panic!("Expected ToolCallUpdate"),
378 }
379 }
380
381 #[test]
382 fn test_sub_agent_start() {
383 let event = EngineEvent::SubAgentStart {
384 agent_name: "reviewer".into(),
385 };
386 let acp = engine_event_to_acp(&event, "s1").unwrap();
387 match acp.update {
388 acp::SessionUpdate::ToolCall(tc) => {
389 assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
390 assert_eq!(tc.kind, acp::ToolKind::Other);
391 }
392 _ => panic!("Expected ToolCall"),
393 }
394 }
395
396 #[test]
397 fn test_action_blocked() {
398 let event = EngineEvent::ActionBlocked {
399 tool_name: "Bash".into(),
400 detail: "rm -rf /".into(),
401 preview: None,
402 };
403 let acp = engine_event_to_acp(&event, "s1").unwrap();
404 match acp.update {
405 acp::SessionUpdate::ToolCallUpdate(update) => {
406 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
407 assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
408 }
409 _ => panic!("Expected ToolCallUpdate"),
410 }
411 }
412
413 #[test]
414 fn test_info_warn_error() {
415 for (event, prefix) in [
416 (
417 EngineEvent::Info {
418 message: "hello".into(),
419 },
420 "[info]",
421 ),
422 (
423 EngineEvent::Warn {
424 message: "watch out".into(),
425 },
426 "[warn]",
427 ),
428 (
429 EngineEvent::Error {
430 message: "oops".into(),
431 },
432 "[error]",
433 ),
434 ] {
435 let acp = engine_event_to_acp(&event, "s1").unwrap();
436 match acp.update {
437 acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
438 acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
439 _ => panic!("Expected text block"),
440 },
441 _ => panic!("Expected AgentMessageChunk"),
442 }
443 }
444 }
445
446 #[test]
447 fn test_none_events() {
448 let none_events = vec![
449 EngineEvent::TextDone,
450 EngineEvent::ThinkingStart,
451 EngineEvent::ThinkingDone,
452 EngineEvent::ResponseStart,
453 EngineEvent::ApprovalRequest {
454 id: "a".into(),
455 tool_name: "Bash".into(),
456 detail: "cmd".into(),
457 preview: None,
458 },
459 EngineEvent::AskUserRequest {
460 id: "b".into(),
461 question: "Which db?".into(),
462 options: vec![],
463 },
464 EngineEvent::StatusUpdate {
465 model: "m".into(),
466 provider: "p".into(),
467 context_pct: 0.5,
468 approval_mode: "normal".into(),
469 active_tools: 0,
470 },
471 EngineEvent::Footer {
472 prompt_tokens: 0,
473 completion_tokens: 0,
474 cache_read_tokens: 0,
475 thinking_tokens: 0,
476 total_chars: 0,
477 elapsed_ms: 0,
478 rate: 0.0,
479 context: String::new(),
480 },
481 EngineEvent::SpinnerStart {
482 message: "x".into(),
483 },
484 EngineEvent::SpinnerStop,
485 EngineEvent::TurnStart {
486 turn_id: "t1".into(),
487 },
488 EngineEvent::TurnEnd {
489 turn_id: "t1".into(),
490 reason: koda_core::engine::event::TurnEndReason::Complete,
491 },
492 EngineEvent::LoopCapReached {
493 cap: 200,
494 recent_tools: vec![],
495 },
496 ];
497 for event in none_events {
498 assert!(
499 engine_event_to_acp(&event, "s1").is_none(),
500 "Expected None for {event:?}"
501 );
502 }
503 }
504
505 #[test]
506 fn test_map_tool_kind() {
507 assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
508 assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
509 assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
510 assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
511 assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
512 assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
513 assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
514 assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
515 assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
516 assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
517 }
518}