1use agent_client_protocol_schema as acp;
2use koda_core::engine::sink::EngineSink;
3use koda_core::engine::{ApprovalDecision, EngineCommand, EngineEvent};
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicI64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone)]
12pub enum AcpOutgoing {
13 Notification(acp::SessionNotification),
14 PermissionRequest {
15 rpc_id: acp::RequestId,
16 request: acp::RequestPermissionRequest,
17 },
18}
19
20pub fn map_tool_kind(name: &str) -> acp::ToolKind {
22 match name {
23 "Read" => acp::ToolKind::Read,
24 "Write" | "Edit" | "NotebookEdit" => acp::ToolKind::Edit,
25 "Bash" | "Shell" => acp::ToolKind::Execute,
26 "Grep" | "Glob" => acp::ToolKind::Search,
27 "Delete" => acp::ToolKind::Delete,
28 "WebFetch" => acp::ToolKind::Fetch,
29 "Think" => acp::ToolKind::Think,
30 _ => acp::ToolKind::Other,
31 }
32}
33
34pub fn engine_event_to_acp(
39 event: &EngineEvent,
40 session_id: &str,
41) -> Option<acp::SessionNotification> {
42 match event {
43 EngineEvent::TextDelta { text } => {
44 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
45 Some(acp::SessionNotification::new(
46 session_id.to_string(),
47 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
48 ))
49 }
50 EngineEvent::TextDone => None,
51 EngineEvent::ThinkingStart => None,
52 EngineEvent::ThinkingDelta { text } => {
53 let cb = acp::ContentBlock::Text(acp::TextContent::new(text.clone()));
54 Some(acp::SessionNotification::new(
55 session_id.to_string(),
56 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(cb)),
57 ))
58 }
59 EngineEvent::ThinkingDone => None,
60 EngineEvent::ResponseStart => None,
61
62 EngineEvent::ToolCallStart { id, name, args, .. } => {
63 let tc = acp::ToolCall::new(id.clone(), name.clone())
64 .kind(map_tool_kind(name))
65 .status(acp::ToolCallStatus::InProgress)
66 .raw_input(Some(args.clone()));
67 Some(acp::SessionNotification::new(
68 session_id.to_string(),
69 acp::SessionUpdate::ToolCall(tc),
70 ))
71 }
72
73 EngineEvent::ToolCallResult {
74 id,
75 name: _,
76 output,
77 } => {
78 let content = vec![acp::ToolCallContent::Content(acp::Content::new(
79 acp::ContentBlock::Text(acp::TextContent::new(output.clone())),
80 ))];
81 let fields = acp::ToolCallUpdateFields::new()
82 .status(acp::ToolCallStatus::Completed)
83 .content(content);
84 let update = acp::ToolCallUpdate::new(id.clone(), fields);
85 Some(acp::SessionNotification::new(
86 session_id.to_string(),
87 acp::SessionUpdate::ToolCallUpdate(update),
88 ))
89 }
90
91 EngineEvent::SubAgentStart { agent_name } => {
92 let tc = acp::ToolCall::new(agent_name.clone(), format!("Sub-agent: {agent_name}"))
93 .kind(acp::ToolKind::Other)
94 .status(acp::ToolCallStatus::InProgress);
95 Some(acp::SessionNotification::new(
96 session_id.to_string(),
97 acp::SessionUpdate::ToolCall(tc),
98 ))
99 }
100
101 EngineEvent::SubAgentEnd { agent_name } => {
102 let fields = acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed);
103 let update = acp::ToolCallUpdate::new(agent_name.clone(), fields);
104 Some(acp::SessionNotification::new(
105 session_id.to_string(),
106 acp::SessionUpdate::ToolCallUpdate(update),
107 ))
108 }
109
110 EngineEvent::ApprovalRequest { .. } => None,
112
113 EngineEvent::ActionBlocked {
114 tool_name: _,
115 detail,
116 ..
117 } => {
118 let fields = acp::ToolCallUpdateFields::new()
119 .status(acp::ToolCallStatus::Failed)
120 .title(format!("Blocked: {detail}"));
121 let update = acp::ToolCallUpdate::new("blocked".to_string(), fields);
122 Some(acp::SessionNotification::new(
123 session_id.to_string(),
124 acp::SessionUpdate::ToolCallUpdate(update),
125 ))
126 }
127
128 EngineEvent::StatusUpdate { .. } => None,
129 EngineEvent::Footer { .. } => None,
130 EngineEvent::SpinnerStart { .. } => None,
131 EngineEvent::SpinnerStop => None,
132
133 EngineEvent::Info { message } => {
134 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[info] {message}")));
135 Some(acp::SessionNotification::new(
136 session_id.to_string(),
137 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
138 ))
139 }
140 EngineEvent::Warn { message } => {
141 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[warn] {message}")));
142 Some(acp::SessionNotification::new(
143 session_id.to_string(),
144 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
145 ))
146 }
147 EngineEvent::Error { message } => {
148 let cb = acp::ContentBlock::Text(acp::TextContent::new(format!("[error] {message}")));
149 Some(acp::SessionNotification::new(
150 session_id.to_string(),
151 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(cb)),
152 ))
153 }
154 }
155}
156
157pub struct PendingApproval {
160 pub engine_approval_id: String,
161}
162
163pub struct AcpSink {
166 session_id: String,
167 tx: mpsc::Sender<AcpOutgoing>,
168 #[allow(dead_code)]
171 cmd_tx: mpsc::Sender<EngineCommand>,
172 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
173 next_rpc_id: Arc<AtomicI64>,
174}
175
176impl AcpSink {
177 pub fn new(
178 session_id: String,
179 tx: mpsc::Sender<AcpOutgoing>,
180 cmd_tx: mpsc::Sender<EngineCommand>,
181 pending_approvals: Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
182 next_rpc_id: Arc<AtomicI64>,
183 ) -> Self {
184 Self {
185 session_id,
186 tx,
187 cmd_tx,
188 pending_approvals,
189 next_rpc_id,
190 }
191 }
192}
193
194impl EngineSink for AcpSink {
195 fn emit(&self, event: EngineEvent) {
196 if let EngineEvent::ApprovalRequest {
198 ref id,
199 ref tool_name,
200 ref detail,
201 ..
202 } = event
203 {
204 let rpc_id_num = self.next_rpc_id.fetch_add(1, Ordering::Relaxed);
205 let rpc_id = acp::RequestId::Number(rpc_id_num);
206
207 let tc_fields = acp::ToolCallUpdateFields::new()
209 .status(acp::ToolCallStatus::Pending)
210 .title(detail.clone());
211 let tc_update = acp::ToolCallUpdate::new(tool_name.clone(), tc_fields);
212
213 let options = vec![
214 acp::PermissionOption::new(
215 "approve",
216 "Approve",
217 acp::PermissionOptionKind::AllowOnce,
218 ),
219 acp::PermissionOption::new(
220 "reject",
221 "Reject",
222 acp::PermissionOptionKind::RejectOnce,
223 ),
224 acp::PermissionOption::new(
225 "always_allow",
226 "Always Allow",
227 acp::PermissionOptionKind::AllowAlways,
228 ),
229 ];
230
231 let request =
232 acp::RequestPermissionRequest::new(self.session_id.clone(), tc_update, options);
233
234 self.pending_approvals.lock().unwrap().insert(
236 rpc_id.clone(),
237 PendingApproval {
238 engine_approval_id: id.clone(),
239 },
240 );
241
242 let _ = self
243 .tx
244 .try_send(AcpOutgoing::PermissionRequest { rpc_id, request });
245 return;
246 }
247
248 if let Some(notification) = engine_event_to_acp(&event, &self.session_id) {
250 let _ = self.tx.try_send(AcpOutgoing::Notification(notification));
251 }
252 }
253}
254
255pub fn resolve_permission_response(
258 pending_approvals: &Arc<Mutex<HashMap<acp::RequestId, PendingApproval>>>,
259 rpc_id: &acp::RequestId,
260 outcome: &acp::RequestPermissionOutcome,
261 cmd_tx: &mpsc::Sender<EngineCommand>,
262) -> bool {
263 let pending = pending_approvals.lock().unwrap().remove(rpc_id);
264 if let Some(approval) = pending {
265 let decision = match outcome {
266 acp::RequestPermissionOutcome::Cancelled => ApprovalDecision::Reject,
267 acp::RequestPermissionOutcome::Selected(selected) => {
268 match selected.option_id.0.as_ref() {
269 "approve" => ApprovalDecision::Approve,
270 "always_allow" => ApprovalDecision::AlwaysAllow,
271 _ => ApprovalDecision::Reject,
272 }
273 }
274 _ => ApprovalDecision::Reject,
275 };
276 let _ = cmd_tx.try_send(EngineCommand::ApprovalResponse {
277 id: approval.engine_approval_id,
278 decision,
279 });
280 true
281 } else {
282 false
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn test_text_delta() {
292 let event = EngineEvent::TextDelta {
293 text: "hello".into(),
294 };
295 let acp = engine_event_to_acp(&event, "session-1").unwrap();
296
297 assert_eq!(acp.session_id, "session-1".to_string().into());
298 match acp.update {
299 acp::SessionUpdate::AgentMessageChunk(chunk) => {
300 let block = chunk.content;
301 match block {
302 acp::ContentBlock::Text(text_content) => {
303 assert_eq!(text_content.text, "hello");
304 }
305 _ => panic!("Expected text block"),
306 }
307 }
308 _ => panic!("Expected AgentMessageChunk"),
309 }
310 }
311
312 #[test]
313 fn test_thinking_delta() {
314 let event = EngineEvent::ThinkingDelta {
315 text: "reasoning...".into(),
316 };
317 let acp = engine_event_to_acp(&event, "s1").unwrap();
318 match acp.update {
319 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
320 acp::ContentBlock::Text(tc) => assert_eq!(tc.text, "reasoning..."),
321 _ => panic!("Expected text block"),
322 },
323 _ => panic!("Expected AgentThoughtChunk"),
324 }
325 }
326
327 #[test]
328 fn test_tool_call_start() {
329 let event = EngineEvent::ToolCallStart {
330 id: "call_1".into(),
331 name: "Bash".into(),
332 args: serde_json::json!({"command": "ls"}),
333 is_sub_agent: false,
334 };
335 let acp = engine_event_to_acp(&event, "s1").unwrap();
336 match acp.update {
337 acp::SessionUpdate::ToolCall(tc) => {
338 assert_eq!(tc.tool_call_id.0.as_ref(), "call_1");
339 assert_eq!(tc.title, "Bash");
340 assert_eq!(tc.kind, acp::ToolKind::Execute);
341 assert_eq!(tc.status, acp::ToolCallStatus::InProgress);
342 }
343 _ => panic!("Expected ToolCall"),
344 }
345 }
346
347 #[test]
348 fn test_tool_call_result() {
349 let event = EngineEvent::ToolCallResult {
350 id: "call_1".into(),
351 name: "Read".into(),
352 output: "file contents".into(),
353 };
354 let acp = engine_event_to_acp(&event, "s1").unwrap();
355 match acp.update {
356 acp::SessionUpdate::ToolCallUpdate(update) => {
357 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
358 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
359 }
360 _ => panic!("Expected ToolCallUpdate"),
361 }
362 }
363
364 #[test]
365 fn test_sub_agent_start() {
366 let event = EngineEvent::SubAgentStart {
367 agent_name: "reviewer".into(),
368 };
369 let acp = engine_event_to_acp(&event, "s1").unwrap();
370 match acp.update {
371 acp::SessionUpdate::ToolCall(tc) => {
372 assert_eq!(tc.tool_call_id.0.as_ref(), "reviewer");
373 assert_eq!(tc.kind, acp::ToolKind::Other);
374 }
375 _ => panic!("Expected ToolCall"),
376 }
377 }
378
379 #[test]
380 fn test_sub_agent_end() {
381 let event = EngineEvent::SubAgentEnd {
382 agent_name: "reviewer".into(),
383 };
384 let acp = engine_event_to_acp(&event, "s1").unwrap();
385 match acp.update {
386 acp::SessionUpdate::ToolCallUpdate(update) => {
387 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Completed));
388 }
389 _ => panic!("Expected ToolCallUpdate"),
390 }
391 }
392
393 #[test]
394 fn test_action_blocked() {
395 let event = EngineEvent::ActionBlocked {
396 tool_name: "Bash".into(),
397 detail: "rm -rf /".into(),
398 preview: None,
399 };
400 let acp = engine_event_to_acp(&event, "s1").unwrap();
401 match acp.update {
402 acp::SessionUpdate::ToolCallUpdate(update) => {
403 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::Failed));
404 assert_eq!(update.fields.title, Some("Blocked: rm -rf /".to_string()));
405 }
406 _ => panic!("Expected ToolCallUpdate"),
407 }
408 }
409
410 #[test]
411 fn test_info_warn_error() {
412 for (event, prefix) in [
413 (
414 EngineEvent::Info {
415 message: "hello".into(),
416 },
417 "[info]",
418 ),
419 (
420 EngineEvent::Warn {
421 message: "watch out".into(),
422 },
423 "[warn]",
424 ),
425 (
426 EngineEvent::Error {
427 message: "oops".into(),
428 },
429 "[error]",
430 ),
431 ] {
432 let acp = engine_event_to_acp(&event, "s1").unwrap();
433 match acp.update {
434 acp::SessionUpdate::AgentMessageChunk(chunk) => match chunk.content {
435 acp::ContentBlock::Text(tc) => assert!(tc.text.starts_with(prefix)),
436 _ => panic!("Expected text block"),
437 },
438 _ => panic!("Expected AgentMessageChunk"),
439 }
440 }
441 }
442
443 #[test]
444 fn test_none_events() {
445 let none_events = vec![
446 EngineEvent::TextDone,
447 EngineEvent::ThinkingStart,
448 EngineEvent::ThinkingDone,
449 EngineEvent::ResponseStart,
450 EngineEvent::ApprovalRequest {
451 id: "a".into(),
452 tool_name: "Bash".into(),
453 detail: "cmd".into(),
454 preview: None,
455 whitelist_hint: None,
456 },
457 EngineEvent::StatusUpdate {
458 model: "m".into(),
459 provider: "p".into(),
460 context_pct: 0.5,
461 approval_mode: "normal".into(),
462 active_tools: 0,
463 },
464 EngineEvent::Footer {
465 prompt_tokens: 0,
466 completion_tokens: 0,
467 cache_read_tokens: 0,
468 thinking_tokens: 0,
469 total_chars: 0,
470 elapsed_ms: 0,
471 rate: 0.0,
472 context: String::new(),
473 },
474 EngineEvent::SpinnerStart {
475 message: "x".into(),
476 },
477 EngineEvent::SpinnerStop,
478 ];
479 for event in none_events {
480 assert!(
481 engine_event_to_acp(&event, "s1").is_none(),
482 "Expected None for {event:?}"
483 );
484 }
485 }
486
487 #[test]
488 fn test_map_tool_kind() {
489 assert_eq!(map_tool_kind("Read"), acp::ToolKind::Read);
490 assert_eq!(map_tool_kind("Write"), acp::ToolKind::Edit);
491 assert_eq!(map_tool_kind("Edit"), acp::ToolKind::Edit);
492 assert_eq!(map_tool_kind("Bash"), acp::ToolKind::Execute);
493 assert_eq!(map_tool_kind("Grep"), acp::ToolKind::Search);
494 assert_eq!(map_tool_kind("Glob"), acp::ToolKind::Search);
495 assert_eq!(map_tool_kind("Delete"), acp::ToolKind::Delete);
496 assert_eq!(map_tool_kind("WebFetch"), acp::ToolKind::Fetch);
497 assert_eq!(map_tool_kind("Think"), acp::ToolKind::Think);
498 assert_eq!(map_tool_kind("Unknown"), acp::ToolKind::Other);
499 }
500}