1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5 self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6 PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7 ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServer as RuntimeMcpServer, McpTransport};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23 let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26 let hint = prompt
29 .arguments
30 .as_ref()
31 .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32 .unwrap_or("optional arguments");
33 let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35 let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37 acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<RuntimeMcpServer> {
42 servers
43 .into_iter()
44 .filter_map(|s| {
45 try_map_mcp_server(s).or_else(|| {
46 tracing::warn!("Unsupported ACP MCP server transport, skipping");
47 None
48 })
49 })
50 .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<RuntimeMcpServer> {
54 use McpServer::{Http, Sse, Stdio};
55 match server {
56 Stdio(stdio) => Some(RuntimeMcpServer::new(
57 stdio.name,
58 McpTransport::Stdio {
59 command: stdio.command.to_string_lossy().into_owned(),
60 args: stdio.args,
61 env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62 },
63 false,
64 )),
65
66 Http(http) => Some(RuntimeMcpServer::new(
67 http.name,
68 McpTransport::Http { config: http_config(http.url, &http.headers) },
69 false,
70 )),
71
72 Sse(sse) => Some(RuntimeMcpServer::new(
73 sse.name,
74 McpTransport::Http { config: http_config(sse.url, &sse.headers) },
75 false,
76 )),
77
78 _ => None,
79 }
80}
81
82fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
83 let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
84
85 let mut config = StreamableHttpClientTransportConfig::with_uri(url);
86 if let Some(auth) = auth_header {
87 let token = auth
89 .split_once(' ')
90 .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
91 .map_or(auth.as_str(), |(_, rest)| rest);
92 config = config.auth_header(token.to_string());
93 }
94 config
95}
96
97pub fn map_agent_message_to_session_notification(
99 session_id: SessionId,
100 msg: &AgentMessage,
101) -> Option<SessionNotification> {
102 map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
103}
104
105#[derive(Clone, Copy)]
106enum NotificationMode {
107 Live,
108 Replay,
109}
110
111fn map_agent_message_to_notification(
112 session_id: SessionId,
113 msg: &AgentMessage,
114 mode: NotificationMode,
115) -> Option<SessionNotification> {
116 match msg {
117 AgentMessage::Text { chunk, is_complete, .. } => {
118 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
119 }
120
121 AgentMessage::Thought { chunk, is_complete, .. } => {
122 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
123 }
124
125 AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
126
127 AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
128 Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
129 }
130
131 AgentMessage::ToolResult { result, result_meta, .. } => {
132 Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
133 }
134
135 AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
136
137 AgentMessage::ToolProgress { request, progress, total, message } => {
138 map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
139 }
140
141 AgentMessage::Error { message } => Some(acp::SessionNotification::new(
142 session_id,
143 SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
144 "[Error] {message}"
145 ))))),
146 )),
147
148 AgentMessage::ContextUsageUpdate { .. }
149 | AgentMessage::ContextCleared
150 | AgentMessage::Cancelled { .. }
151 | AgentMessage::Done
152 | AgentMessage::ContextCompactionStarted { .. }
153 | AgentMessage::ContextCompactionResult { .. }
154 | AgentMessage::AutoContinue { .. }
155 | AgentMessage::Retrying { .. }
156 | AgentMessage::ModelSwitched { .. } => None,
157 }
158}
159
160pub enum AgentExtNotification {
164 ContextUsage(ContextUsageParams),
165 ContextCleared(ContextClearedParams),
166 SubAgentProgress(SubAgentProgressParams),
167}
168
169pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
170 match msg {
171 AgentMessage::ContextUsageUpdate {
172 usage_ratio,
173 context_limit,
174 input_tokens,
175 output_tokens,
176 cache_read_tokens,
177 cache_creation_tokens,
178 reasoning_tokens,
179 total_input_tokens,
180 total_output_tokens,
181 total_cache_read_tokens,
182 total_cache_creation_tokens,
183 total_reasoning_tokens,
184 } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
185 usage_ratio: *usage_ratio,
186 context_limit: *context_limit,
187 input_tokens: *input_tokens,
188 output_tokens: *output_tokens,
189 cache_read_tokens: *cache_read_tokens,
190 cache_creation_tokens: *cache_creation_tokens,
191 reasoning_tokens: *reasoning_tokens,
192 total_input_tokens: *total_input_tokens,
193 total_output_tokens: *total_output_tokens,
194 total_cache_read_tokens: *total_cache_read_tokens,
195 total_cache_creation_tokens: *total_cache_creation_tokens,
196 total_reasoning_tokens: *total_reasoning_tokens,
197 })),
198 AgentMessage::ToolProgress { request, message, .. } => {
199 let msg_str = message.as_ref()?;
200 let params = try_parse_sub_agent_progress(msg_str, request)?;
201 Some(AgentExtNotification::SubAgentProgress(params))
202 }
203 AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
204 _ => None,
205 }
206}
207
208pub fn try_extract_plan_notification(
210 session_id: SessionId,
211 result_meta: Option<&ToolResultMeta>,
212) -> Option<SessionNotification> {
213 let plan_meta = result_meta?.plan.as_ref()?;
214 let entries = plan_meta
215 .entries
216 .iter()
217 .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
218 .collect();
219 Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
220}
221
222fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
224 match status {
225 PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
226 PlanMetaStatus::Completed => PlanEntryStatus::Completed,
227 PlanMetaStatus::Pending => PlanEntryStatus::Pending,
228 }
229}
230
231pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
233 match msg {
234 AgentMessage::Cancelled { .. } => StopReason::Cancelled,
235 _ => StopReason::EndTurn,
236 }
237}
238
239fn map_chunk_to_notification(
240 session_id: SessionId,
241 chunk: &str,
242 is_complete: bool,
243 mode: NotificationMode,
244 wrap: fn(ContentChunk) -> SessionUpdate,
245) -> Option<SessionNotification> {
246 match mode {
247 NotificationMode::Live if is_complete => return None,
250 NotificationMode::Replay if !is_complete => return None,
251 NotificationMode::Live | NotificationMode::Replay => {}
252 }
253
254 Some(acp::SessionNotification::new(
255 session_id,
256 wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
257 ))
258}
259
260fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
261 let raw_input = serde_json::from_str(&request.arguments).ok();
262 SessionNotification::new(
263 session_id,
264 SessionUpdate::ToolCall(
265 ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
266 .status(acp::ToolCallStatus::InProgress)
267 .raw_input(raw_input),
268 ),
269 )
270}
271
272fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
273 serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
274}
275
276fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
277 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
278
279 SessionNotification::new(
280 session_id,
281 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
282 )
283}
284
285fn humanize_tool_name(name: &str) -> String {
288 let base = name.split("__").last().unwrap_or(name);
289 let mut result = base.replace('_', " ");
290 if let Some(first) = result.get_mut(0..1) {
291 first.make_ascii_uppercase();
292 }
293 result
294}
295
296fn map_tool_result_to_notification(
297 session_id: SessionId,
298 result: &ToolCallResult,
299 result_meta: Option<&ToolResultMeta>,
300) -> SessionNotification {
301 let mut content =
302 vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
303
304 if let Some(rm) = result_meta
305 && let Some(fd) = &rm.file_diff
306 {
307 let mut diff = Diff::new(&fd.path, &fd.new_text);
308 if let Some(old) = &fd.old_text {
309 diff = diff.old_text(old.clone());
310 }
311 content.push(ToolCallContent::Diff(diff));
312 }
313
314 let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
315
316 if let Some(rm) = result_meta {
317 fields = fields.title(&rm.display.title);
318 }
319
320 let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
321
322 if let Some(rm) = result_meta
323 && !rm.display.value.is_empty()
324 {
325 let mut meta_map = serde_json::Map::new();
326 meta_map.insert("display_value".into(), rm.display.value.clone().into());
327 update = update.meta(meta_map);
328 }
329
330 SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
331}
332
333fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
334 SessionNotification::new(
335 session_id,
336 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
337 ToolCallId::new(error.id.clone()),
338 ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
339 Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
340 )]),
341 )),
342 )
343}
344
345fn map_tool_progress_to_notification(
346 session_id: SessionId,
347 request: &ToolCallRequest,
348 progress: f64,
349 total: Option<f64>,
350 message: Option<&String>,
351) -> Option<SessionNotification> {
352 tracing::debug!("Tool progress: {message:?}");
353
354 if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
355 return None;
356 }
357
358 if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
359 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
360
361 let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
362
363 if !result_meta.display.value.is_empty() {
364 let mut meta_map = serde_json::Map::new();
365 meta_map.insert("display_value".into(), result_meta.display.value.into());
366 update = update.meta(meta_map);
367 }
368
369 return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
370 }
371
372 let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
373 let progress_text = message
374 .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
375
376 Some(SessionNotification::new(
377 session_id,
378 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
379 ToolCallId::new(request.id.clone()),
380 ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
381 Content::new(ContentBlock::Text(TextContent::new(progress_text))),
382 )]),
383 )),
384 ))
385}
386
387pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
389 for notif in replay_events_to_notifications(events, session_id) {
390 if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
391 tracing::error!("Failed to send replay notification: {e:?}");
392 }
393 }
394}
395
396pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
397 let mut out = Vec::new();
398 for event in events {
399 match event {
400 SessionEvent::User(UserEvent::Message { content }) => {
401 for block in content {
402 out.push(SessionNotification::new(
403 session_id.clone(),
404 SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
405 ));
406 }
407 }
408 SessionEvent::Agent(message) => {
409 out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
410 }
411 SessionEvent::User(_) => {}
412 }
413 }
414 out
415}
416
417fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
418 match block {
419 llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
420 llm::ContentBlock::Image { data, mime_type } => {
421 ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
422 }
423 llm::ContentBlock::Audio { data, mime_type } => {
424 ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
425 }
426 }
427}
428
429fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
430 serde_json::from_str::<ToolResultMeta>(message).ok()
431}
432
433fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
435 let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
436
437 Some(SubAgentProgressParams {
438 parent_tool_id: request.id.clone(),
439 task_id: payload.task_id,
440 agent_name: payload.agent_name,
441 event: (&payload.event).into(),
442 })
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use acp_utils::notifications::SubAgentEvent;
449 use llm::ToolCallRequest;
450
451 #[test]
452 fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
453 let session_id = acp::SessionId::new("test-session");
454
455 let payload = SubAgentProgressPayload {
456 task_id: "task_1".to_string(),
457 agent_name: "sub-agent".to_string(),
458 event: AgentMessage::Text {
459 message_id: "msg_1".to_string(),
460 chunk: "Hello".to_string(),
461 is_complete: false,
462 model_name: "TestModel".to_string(),
463 },
464 };
465 let serialized_msg = serde_json::to_string(&payload).unwrap();
466
467 let tool_progress = AgentMessage::ToolProgress {
468 request: ToolCallRequest {
469 id: "call_123".to_string(),
470 name: "plugins__spawn_subagent".to_string(),
471 arguments: "{}".to_string(),
472 },
473 progress: 42.0,
474 total: Some(100.0),
475 message: Some(serialized_msg.clone()),
476 };
477
478 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
479
480 assert!(notification.is_none());
481
482 let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
483 match agent_notif {
484 AgentExtNotification::SubAgentProgress(params) => {
485 assert_eq!(params.parent_tool_id, "call_123");
486 assert_eq!(params.task_id, "task_1");
487 assert_eq!(params.agent_name, "sub-agent");
488 assert!(matches!(params.event, SubAgentEvent::Other));
489 }
490 _ => panic!("expected SubAgentProgress"),
491 }
492 }
493
494 #[test]
495 fn replay_emits_user_media_chunks_in_order() {
496 let session_id = acp::SessionId::new("test-session");
497 let events = vec![SessionEvent::User(UserEvent::Message {
498 content: vec![
499 llm::ContentBlock::text("hello"),
500 llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
501 llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
502 ],
503 })];
504
505 let notifications = replay_events_to_notifications(&events, &session_id);
506 let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
507 assert!(matches!(
508 &updates[0],
509 acp::SessionUpdate::UserMessageChunk(chunk)
510 if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
511 ));
512 assert!(matches!(
513 &updates[1],
514 acp::SessionUpdate::UserMessageChunk(chunk)
515 if matches!(&chunk.content, acp::ContentBlock::Image(_))
516 ));
517 assert!(matches!(
518 &updates[2],
519 acp::SessionUpdate::UserMessageChunk(chunk)
520 if matches!(&chunk.content, acp::ContentBlock::Audio(_))
521 ));
522 }
523
524 #[test]
525 fn test_thought_maps_to_agent_thought_chunk() {
526 let session_id = acp::SessionId::new("test-session");
527 let thought = AgentMessage::Thought {
528 message_id: "msg_1".to_string(),
529 chunk: "thinking...".to_string(),
530 is_complete: false,
531 model_name: "TestModel".to_string(),
532 };
533
534 let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
535
536 match notification.update {
537 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
538 acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
539 other => panic!("Expected text content, got {other:?}"),
540 },
541 other => panic!("Expected AgentThoughtChunk, got {other:?}"),
542 }
543 }
544
545 #[test]
546 fn test_tool_call_maps_to_tool_call_notification() {
547 let session_id = acp::SessionId::new("test-session");
548 let message = AgentMessage::ToolCall {
549 request: ToolCallRequest {
550 id: "call_1".to_string(),
551 name: "coding__read_file".to_string(),
552 arguments: "{}".to_string(),
553 },
554 model_name: "TestModel".to_string(),
555 };
556
557 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
558
559 match notification.update {
560 acp::SessionUpdate::ToolCall(tool_call) => {
561 assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
562 assert_eq!(tool_call.title, "Read file");
563 assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
564 }
565 other => panic!("Expected ToolCall, got {other:?}"),
566 }
567 }
568
569 #[test]
570 fn test_tool_call_update_maps_to_tool_call_update_notification() {
571 let session_id = acp::SessionId::new("test-session");
572 let message = AgentMessage::ToolCallUpdate {
573 tool_call_id: "call_1".to_string(),
574 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
575 model_name: "TestModel".to_string(),
576 };
577
578 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
579
580 match notification.update {
581 acp::SessionUpdate::ToolCallUpdate(update) => {
582 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
583 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
584 assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
585 }
586 other => panic!("Expected ToolCallUpdate, got {other:?}"),
587 }
588 }
589
590 #[test]
591 fn test_tool_call_update_has_same_live_and_replay_mapping() {
592 let session_id = acp::SessionId::new("test-session");
593 let message = AgentMessage::ToolCallUpdate {
594 tool_call_id: "call_1".to_string(),
595 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
596 model_name: "TestModel".to_string(),
597 };
598
599 let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
600 .expect("live notification");
601 let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
602 .expect("replay notification");
603
604 match (live.update, replay.update) {
605 (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
606 assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
607 assert_eq!(live.fields.status, replay.fields.status);
608 assert_eq!(live.fields.raw_input, replay.fields.raw_input);
609 }
610 other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
611 }
612 }
613
614 #[test]
615 fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
616 let cases: Vec<(AgentMessage, &str)> = vec![
617 (
618 AgentMessage::Text {
619 message_id: "msg_1".to_string(),
620 chunk: "done".to_string(),
621 is_complete: true,
622 model_name: "TestModel".to_string(),
623 },
624 "done",
625 ),
626 (
627 AgentMessage::Thought {
628 message_id: "msg_1".to_string(),
629 chunk: "final reasoning".to_string(),
630 is_complete: true,
631 model_name: "TestModel".to_string(),
632 },
633 "final reasoning",
634 ),
635 ];
636
637 for (message, expected_text) in cases {
638 let session_id = acp::SessionId::new("test-session");
639 assert!(
640 map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
641 "live mode should skip completed chunk"
642 );
643
644 let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
645 .expect("replay notification");
646
647 match notification.update {
648 acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
649 match chunk.content {
650 acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
651 other => panic!("Expected text content, got {other:?}"),
652 }
653 }
654 other => panic!("Expected chunk update, got {other:?}"),
655 }
656 }
657 }
658
659 #[test]
660 fn test_context_cleared_maps_to_agent_notification() {
661 let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
662 .expect("context cleared should emit agent notification");
663 assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
664 }
665
666 #[test]
667 fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
668 let session_id = acp::SessionId::new("test-session");
669
670 let tool_progress = AgentMessage::ToolProgress {
672 request: ToolCallRequest {
673 id: "call_456".to_string(),
674 name: "some_tool".to_string(),
675 arguments: "{}".to_string(),
676 },
677 progress: 50.0,
678 total: None,
679 message: Some("not valid json".to_string()),
680 };
681
682 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
683
684 assert!(notification.is_some());
685
686 let notification = notification.unwrap();
688 match notification.update {
689 acp::SessionUpdate::ToolCallUpdate(update) => {
690 if let Some(content) = &update.fields.content
691 && let acp::ToolCallContent::Content(c) = &content[0]
692 && let acp::ContentBlock::Text(text) = &c.content
693 {
694 assert!(text.text.contains("not valid json"));
696 }
697 }
698 _ => panic!("Expected ToolCallUpdate"),
699 }
700 }
701
702 #[test]
703 fn test_map_acp_stdio_server() {
704 let server = acp::McpServer::Stdio(
705 acp::McpServerStdio::new("my-server", "/usr/bin/server")
706 .args(vec!["--port".into(), "8080".into()])
707 .env(vec![acp::EnvVariable::new("FOO", "bar")]),
708 );
709
710 let configs = map_acp_mcp_servers(vec![server]);
711 assert_eq!(configs.len(), 1);
712
713 match &configs[0].transport {
714 McpTransport::Stdio { command, args, env } => {
715 assert_eq!(configs[0].name, "my-server");
716 assert_eq!(command, "/usr/bin/server");
717 assert_eq!(args, &["--port", "8080"]);
718 assert_eq!(env.get("FOO").unwrap(), "bar");
719 }
720 other => panic!("Expected Stdio, got {other:?}"),
721 }
722 }
723
724 #[test]
725 fn test_map_acp_http_server() {
726 let server = acp::McpServer::Http(
727 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
728 .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
729 );
730
731 let configs = map_acp_mcp_servers(vec![server]);
732 assert_eq!(configs.len(), 1);
733
734 match &configs[0].transport {
735 McpTransport::Http { config } => {
736 assert_eq!(configs[0].name, "http-server");
737 assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
738 assert_eq!(config.auth_header.as_deref(), Some("token123"));
739 }
740 other => panic!("Expected Http, got {other:?}"),
741 }
742 }
743
744 #[test]
745 fn test_http_auth_header_strips_bearer_case_insensitively() {
746 let cases = [
747 ("Bearer token123", "token123"),
748 ("bearer token123", "token123"),
749 ("BEARER token123", "token123"),
750 ("bEaReR token123", "token123"),
751 ("Token foo", "Token foo"),
754 ("token123", "token123"),
755 ];
756
757 for (input, expected) in cases {
758 let server = acp::McpServer::Http(
759 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
760 .headers(vec![acp::HttpHeader::new("Authorization", input)]),
761 );
762 let configs = map_acp_mcp_servers(vec![server]);
763 match &configs[0].transport {
764 McpTransport::Http { config } => {
765 assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
766 }
767 other => panic!("Expected Http, got {other:?}"),
768 }
769 }
770 }
771
772 #[test]
773 fn test_map_acp_sse_server() {
774 let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
775
776 let configs = map_acp_mcp_servers(vec![server]);
777 assert_eq!(configs.len(), 1);
778
779 match &configs[0].transport {
780 McpTransport::Http { config } => {
781 assert_eq!(configs[0].name, "sse-server");
782 assert_eq!(config.uri.as_ref(), "https://example.com/sse");
783 assert_eq!(config.auth_header, None);
784 }
785 other => panic!("Expected Http, got {other:?}"),
786 }
787 }
788
789 #[test]
790 fn test_humanize_tool_name() {
791 assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
792 assert_eq!(humanize_tool_name("read_file"), "Read file");
793 assert_eq!(humanize_tool_name("bash"), "Bash");
794 assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
795 }
796
797 #[test]
798 fn test_result_with_result_meta_sets_meta() {
799 use mcp_utils::display_meta::ToolDisplayMeta;
800
801 let session_id = acp::SessionId::new("test-session");
802 let result = ToolCallResult {
803 id: "call_1".to_string(),
804 name: "coding__read_file".to_string(),
805 arguments: "{}".to_string(),
806 result: "file contents".to_string(),
807 };
808 let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
809
810 let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
811 match notification.update {
812 acp::SessionUpdate::ToolCallUpdate(update) => {
813 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
814 let meta = update.meta.expect("meta should be present");
815 assert_eq!(
816 meta.get("display_value").and_then(|v| v.as_str()),
817 Some("Cargo.toml, 156 lines"),
818 "display_value should be a flat key in _meta"
819 );
820 assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
821 }
822 other => panic!("Expected ToolCallUpdate, got {other:?}"),
823 }
824 }
825
826 #[test]
827 fn test_result_without_result_meta() {
828 let session_id = acp::SessionId::new("test-session");
829 let result = ToolCallResult {
830 id: "call_1".to_string(),
831 name: "external__some_tool".to_string(),
832 arguments: "{}".to_string(),
833 result: "ok".to_string(),
834 };
835
836 let notification = map_tool_result_to_notification(session_id, &result, None);
837 match notification.update {
838 acp::SessionUpdate::ToolCallUpdate(update) => {
839 assert!(update.fields.title.is_none());
840 assert!(update.meta.is_none());
841 }
842 other => panic!("Expected ToolCallUpdate, got {other:?}"),
843 }
844 }
845
846 #[test]
847 fn test_plan_notification_extracted_from_result_meta() {
848 use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
849
850 let session_id = acp::SessionId::new("test-session");
851 let meta = ToolResultMeta::with_plan(
852 ToolDisplayMeta::new("Todo", "Research AI agents"),
853 PlanMeta {
854 entries: vec![
855 PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
856 PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
857 ],
858 },
859 );
860
861 let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
862 match notification.update {
863 acp::SessionUpdate::Plan(plan) => {
864 assert_eq!(plan.entries.len(), 2);
865 assert_eq!(plan.entries[0].content, "Research AI agents");
866 assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
867 assert_eq!(plan.entries[1].content, "Write tests");
868 assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
869 }
870 other => panic!("Expected Plan, got {other:?}"),
871 }
872 }
873
874 #[test]
875 fn test_plan_notification_none_when_no_plan_or_no_meta() {
876 use mcp_utils::display_meta::ToolDisplayMeta;
877
878 let sid = acp::SessionId::new("test-session");
879 let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
880 assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
881 assert!(try_extract_plan_notification(sid, None).is_none());
882 }
883
884 #[test]
885 fn test_tool_progress_with_display_meta_emits_meta_update() {
886 use mcp_utils::display_meta::ToolDisplayMeta;
887
888 let session_id = acp::SessionId::new("test-session");
889 let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
890 let serialized = serde_json::to_string(&meta).unwrap();
891
892 let request = ToolCallRequest {
893 id: "call_789".to_string(),
894 name: "coding__read_file".to_string(),
895 arguments: "{}".to_string(),
896 };
897
898 let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
899 .expect("should produce notification");
900
901 match notification.update {
902 acp::SessionUpdate::ToolCallUpdate(update) => {
903 assert_eq!(&*update.tool_call_id.0, "call_789");
904 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
905 let meta_map = update.meta.expect("meta should be present");
906 assert_eq!(
907 meta_map.get("display_value").and_then(|v| v.as_str()),
908 Some("main.rs"),
909 "display_value should be a flat key in _meta"
910 );
911 assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
912 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
913 assert!(update.fields.content.is_none());
915 }
916 other => panic!("Expected ToolCallUpdate, got {other:?}"),
917 }
918 }
919
920 #[tokio::test(flavor = "current_thread")]
921 async fn replay_to_client_forwards_each_event_as_session_notification() {
922 use acp_utils::testing::test_connection;
923 use llm::ContentBlock as LlmContentBlock;
924 use tokio::task::LocalSet;
925
926 LocalSet::new()
927 .run_until(async {
928 let (cx, mut peer) = test_connection().await;
929 let session_id = acp::SessionId::new("test-session");
930 let events = vec![SessionEvent::User(UserEvent::Message {
931 content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
932 })];
933
934 replay_to_client(&events, &cx, &session_id).await;
935
936 for _ in 0..2 {
937 let notif = peer.next_session_notification().await;
938 assert_eq!(notif.session_id, session_id);
939 assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
940 }
941 })
942 .await;
943 }
944}