1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5 self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6 PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7 ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServerConfig, ServerConfig};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23 let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26 let hint = prompt
29 .arguments
30 .as_ref()
31 .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32 .unwrap_or("optional arguments");
33 let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35 let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37 acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<McpServerConfig> {
42 servers
43 .into_iter()
44 .filter_map(|s| {
45 try_map_mcp_server(s).or_else(|| {
46 tracing::warn!("Unsupported ACP MCP server transport, skipping");
47 None
48 })
49 })
50 .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<McpServerConfig> {
54 use McpServer::{Http, Sse, Stdio};
55 match server {
56 Stdio(stdio) => Some(
57 ServerConfig::Stdio {
58 name: stdio.name,
59 command: stdio.command.to_string_lossy().into_owned(),
60 args: stdio.args,
61 env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62 }
63 .into(),
64 ),
65
66 Http(http) => Some(ServerConfig::Http { name: http.name, config: http_config(http.url, &http.headers) }.into()),
67
68 Sse(sse) => Some(ServerConfig::Http { name: sse.name, config: http_config(sse.url, &sse.headers) }.into()),
69
70 _ => None,
71 }
72}
73
74fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
75 let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
76
77 let mut config = StreamableHttpClientTransportConfig::with_uri(url);
78 if let Some(auth) = auth_header {
79 let token = auth
81 .split_once(' ')
82 .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
83 .map_or(auth.as_str(), |(_, rest)| rest);
84 config = config.auth_header(token.to_string());
85 }
86 config
87}
88
89pub fn map_agent_message_to_session_notification(
91 session_id: SessionId,
92 msg: &AgentMessage,
93) -> Option<SessionNotification> {
94 map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
95}
96
97#[derive(Clone, Copy)]
98enum NotificationMode {
99 Live,
100 Replay,
101}
102
103fn map_agent_message_to_notification(
104 session_id: SessionId,
105 msg: &AgentMessage,
106 mode: NotificationMode,
107) -> Option<SessionNotification> {
108 match msg {
109 AgentMessage::Text { chunk, is_complete, .. } => {
110 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
111 }
112
113 AgentMessage::Thought { chunk, is_complete, .. } => {
114 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
115 }
116
117 AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
118
119 AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
120 Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
121 }
122
123 AgentMessage::ToolResult { result, result_meta, .. } => {
124 Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
125 }
126
127 AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
128
129 AgentMessage::ToolProgress { request, progress, total, message } => {
130 map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
131 }
132
133 AgentMessage::Error { message } => Some(acp::SessionNotification::new(
134 session_id,
135 SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
136 "[Error] {message}"
137 ))))),
138 )),
139
140 AgentMessage::ContextUsageUpdate { .. }
141 | AgentMessage::ContextCleared
142 | AgentMessage::Cancelled { .. }
143 | AgentMessage::Done
144 | AgentMessage::ContextCompactionStarted { .. }
145 | AgentMessage::ContextCompactionResult { .. }
146 | AgentMessage::AutoContinue { .. }
147 | AgentMessage::Retrying { .. }
148 | AgentMessage::ModelSwitched { .. } => None,
149 }
150}
151
152pub enum AgentExtNotification {
156 ContextUsage(ContextUsageParams),
157 ContextCleared(ContextClearedParams),
158 SubAgentProgress(SubAgentProgressParams),
159}
160
161pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
162 match msg {
163 AgentMessage::ContextUsageUpdate {
164 usage_ratio,
165 context_limit,
166 input_tokens,
167 output_tokens,
168 cache_read_tokens,
169 cache_creation_tokens,
170 reasoning_tokens,
171 total_input_tokens,
172 total_output_tokens,
173 total_cache_read_tokens,
174 total_cache_creation_tokens,
175 total_reasoning_tokens,
176 } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
177 usage_ratio: *usage_ratio,
178 context_limit: *context_limit,
179 input_tokens: *input_tokens,
180 output_tokens: *output_tokens,
181 cache_read_tokens: *cache_read_tokens,
182 cache_creation_tokens: *cache_creation_tokens,
183 reasoning_tokens: *reasoning_tokens,
184 total_input_tokens: *total_input_tokens,
185 total_output_tokens: *total_output_tokens,
186 total_cache_read_tokens: *total_cache_read_tokens,
187 total_cache_creation_tokens: *total_cache_creation_tokens,
188 total_reasoning_tokens: *total_reasoning_tokens,
189 })),
190 AgentMessage::ToolProgress { request, message, .. } => {
191 let msg_str = message.as_ref()?;
192 let params = try_parse_sub_agent_progress(msg_str, request)?;
193 Some(AgentExtNotification::SubAgentProgress(params))
194 }
195 AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
196 _ => None,
197 }
198}
199
200pub fn try_extract_plan_notification(
202 session_id: SessionId,
203 result_meta: Option<&ToolResultMeta>,
204) -> Option<SessionNotification> {
205 let plan_meta = result_meta?.plan.as_ref()?;
206 let entries = plan_meta
207 .entries
208 .iter()
209 .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
210 .collect();
211 Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
212}
213
214fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
216 match status {
217 PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
218 PlanMetaStatus::Completed => PlanEntryStatus::Completed,
219 PlanMetaStatus::Pending => PlanEntryStatus::Pending,
220 }
221}
222
223pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
225 match msg {
226 AgentMessage::Cancelled { .. } => StopReason::Cancelled,
227 _ => StopReason::EndTurn,
228 }
229}
230
231fn map_chunk_to_notification(
232 session_id: SessionId,
233 chunk: &str,
234 is_complete: bool,
235 mode: NotificationMode,
236 wrap: fn(ContentChunk) -> SessionUpdate,
237) -> Option<SessionNotification> {
238 match mode {
239 NotificationMode::Live if is_complete => return None,
242 NotificationMode::Replay if !is_complete => return None,
243 NotificationMode::Live | NotificationMode::Replay => {}
244 }
245
246 Some(acp::SessionNotification::new(
247 session_id,
248 wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
249 ))
250}
251
252fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
253 let raw_input = serde_json::from_str(&request.arguments).ok();
254 SessionNotification::new(
255 session_id,
256 SessionUpdate::ToolCall(
257 ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
258 .status(acp::ToolCallStatus::InProgress)
259 .raw_input(raw_input),
260 ),
261 )
262}
263
264fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
265 serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
266}
267
268fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
269 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
270
271 SessionNotification::new(
272 session_id,
273 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
274 )
275}
276
277fn humanize_tool_name(name: &str) -> String {
280 let base = name.split("__").last().unwrap_or(name);
281 let mut result = base.replace('_', " ");
282 if let Some(first) = result.get_mut(0..1) {
283 first.make_ascii_uppercase();
284 }
285 result
286}
287
288fn map_tool_result_to_notification(
289 session_id: SessionId,
290 result: &ToolCallResult,
291 result_meta: Option<&ToolResultMeta>,
292) -> SessionNotification {
293 let mut content =
294 vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
295
296 if let Some(rm) = result_meta
297 && let Some(fd) = &rm.file_diff
298 {
299 let mut diff = Diff::new(&fd.path, &fd.new_text);
300 if let Some(old) = &fd.old_text {
301 diff = diff.old_text(old.clone());
302 }
303 content.push(ToolCallContent::Diff(diff));
304 }
305
306 let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
307
308 if let Some(rm) = result_meta {
309 fields = fields.title(&rm.display.title);
310 }
311
312 let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
313
314 if let Some(rm) = result_meta
315 && !rm.display.value.is_empty()
316 {
317 let mut meta_map = serde_json::Map::new();
318 meta_map.insert("display_value".into(), rm.display.value.clone().into());
319 update = update.meta(meta_map);
320 }
321
322 SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
323}
324
325fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
326 SessionNotification::new(
327 session_id,
328 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
329 ToolCallId::new(error.id.clone()),
330 ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
331 Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
332 )]),
333 )),
334 )
335}
336
337fn map_tool_progress_to_notification(
338 session_id: SessionId,
339 request: &ToolCallRequest,
340 progress: f64,
341 total: Option<f64>,
342 message: Option<&String>,
343) -> Option<SessionNotification> {
344 tracing::debug!("Tool progress: {message:?}");
345
346 if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
347 return None;
348 }
349
350 if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
351 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
352
353 let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
354
355 if !result_meta.display.value.is_empty() {
356 let mut meta_map = serde_json::Map::new();
357 meta_map.insert("display_value".into(), result_meta.display.value.into());
358 update = update.meta(meta_map);
359 }
360
361 return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
362 }
363
364 let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
365 let progress_text = message
366 .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
367
368 Some(SessionNotification::new(
369 session_id,
370 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
371 ToolCallId::new(request.id.clone()),
372 ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
373 Content::new(ContentBlock::Text(TextContent::new(progress_text))),
374 )]),
375 )),
376 ))
377}
378
379pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
381 for notif in replay_events_to_notifications(events, session_id) {
382 if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
383 tracing::error!("Failed to send replay notification: {e:?}");
384 }
385 }
386}
387
388pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
389 let mut out = Vec::new();
390 for event in events {
391 match event {
392 SessionEvent::User(UserEvent::Message { content }) => {
393 for block in content {
394 out.push(SessionNotification::new(
395 session_id.clone(),
396 SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
397 ));
398 }
399 }
400 SessionEvent::Agent(message) => {
401 out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
402 }
403 SessionEvent::User(_) => {}
404 }
405 }
406 out
407}
408
409fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
410 match block {
411 llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
412 llm::ContentBlock::Image { data, mime_type } => {
413 ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
414 }
415 llm::ContentBlock::Audio { data, mime_type } => {
416 ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
417 }
418 }
419}
420
421fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
422 serde_json::from_str::<ToolResultMeta>(message).ok()
423}
424
425fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
427 let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
428
429 Some(SubAgentProgressParams {
430 parent_tool_id: request.id.clone(),
431 task_id: payload.task_id,
432 agent_name: payload.agent_name,
433 event: (&payload.event).into(),
434 })
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440 use acp_utils::notifications::SubAgentEvent;
441 use llm::ToolCallRequest;
442
443 #[test]
444 fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
445 let session_id = acp::SessionId::new("test-session");
446
447 let payload = SubAgentProgressPayload {
448 task_id: "task_1".to_string(),
449 agent_name: "sub-agent".to_string(),
450 event: AgentMessage::Text {
451 message_id: "msg_1".to_string(),
452 chunk: "Hello".to_string(),
453 is_complete: false,
454 model_name: "TestModel".to_string(),
455 },
456 };
457 let serialized_msg = serde_json::to_string(&payload).unwrap();
458
459 let tool_progress = AgentMessage::ToolProgress {
460 request: ToolCallRequest {
461 id: "call_123".to_string(),
462 name: "plugins__spawn_subagent".to_string(),
463 arguments: "{}".to_string(),
464 },
465 progress: 42.0,
466 total: Some(100.0),
467 message: Some(serialized_msg.clone()),
468 };
469
470 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
471
472 assert!(notification.is_none());
473
474 let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
475 match agent_notif {
476 AgentExtNotification::SubAgentProgress(params) => {
477 assert_eq!(params.parent_tool_id, "call_123");
478 assert_eq!(params.task_id, "task_1");
479 assert_eq!(params.agent_name, "sub-agent");
480 assert!(matches!(params.event, SubAgentEvent::Other));
481 }
482 _ => panic!("expected SubAgentProgress"),
483 }
484 }
485
486 #[test]
487 fn replay_emits_user_media_chunks_in_order() {
488 let session_id = acp::SessionId::new("test-session");
489 let events = vec![SessionEvent::User(UserEvent::Message {
490 content: vec![
491 llm::ContentBlock::text("hello"),
492 llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
493 llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
494 ],
495 })];
496
497 let notifications = replay_events_to_notifications(&events, &session_id);
498 let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
499 assert!(matches!(
500 &updates[0],
501 acp::SessionUpdate::UserMessageChunk(chunk)
502 if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
503 ));
504 assert!(matches!(
505 &updates[1],
506 acp::SessionUpdate::UserMessageChunk(chunk)
507 if matches!(&chunk.content, acp::ContentBlock::Image(_))
508 ));
509 assert!(matches!(
510 &updates[2],
511 acp::SessionUpdate::UserMessageChunk(chunk)
512 if matches!(&chunk.content, acp::ContentBlock::Audio(_))
513 ));
514 }
515
516 #[test]
517 fn test_thought_maps_to_agent_thought_chunk() {
518 let session_id = acp::SessionId::new("test-session");
519 let thought = AgentMessage::Thought {
520 message_id: "msg_1".to_string(),
521 chunk: "thinking...".to_string(),
522 is_complete: false,
523 model_name: "TestModel".to_string(),
524 };
525
526 let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
527
528 match notification.update {
529 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
530 acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
531 other => panic!("Expected text content, got {other:?}"),
532 },
533 other => panic!("Expected AgentThoughtChunk, got {other:?}"),
534 }
535 }
536
537 #[test]
538 fn test_tool_call_maps_to_tool_call_notification() {
539 let session_id = acp::SessionId::new("test-session");
540 let message = AgentMessage::ToolCall {
541 request: ToolCallRequest {
542 id: "call_1".to_string(),
543 name: "coding__read_file".to_string(),
544 arguments: "{}".to_string(),
545 },
546 model_name: "TestModel".to_string(),
547 };
548
549 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
550
551 match notification.update {
552 acp::SessionUpdate::ToolCall(tool_call) => {
553 assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
554 assert_eq!(tool_call.title, "Read file");
555 assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
556 }
557 other => panic!("Expected ToolCall, got {other:?}"),
558 }
559 }
560
561 #[test]
562 fn test_tool_call_update_maps_to_tool_call_update_notification() {
563 let session_id = acp::SessionId::new("test-session");
564 let message = AgentMessage::ToolCallUpdate {
565 tool_call_id: "call_1".to_string(),
566 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
567 model_name: "TestModel".to_string(),
568 };
569
570 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
571
572 match notification.update {
573 acp::SessionUpdate::ToolCallUpdate(update) => {
574 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
575 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
576 assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
577 }
578 other => panic!("Expected ToolCallUpdate, got {other:?}"),
579 }
580 }
581
582 #[test]
583 fn test_tool_call_update_has_same_live_and_replay_mapping() {
584 let session_id = acp::SessionId::new("test-session");
585 let message = AgentMessage::ToolCallUpdate {
586 tool_call_id: "call_1".to_string(),
587 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
588 model_name: "TestModel".to_string(),
589 };
590
591 let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
592 .expect("live notification");
593 let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
594 .expect("replay notification");
595
596 match (live.update, replay.update) {
597 (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
598 assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
599 assert_eq!(live.fields.status, replay.fields.status);
600 assert_eq!(live.fields.raw_input, replay.fields.raw_input);
601 }
602 other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
603 }
604 }
605
606 #[test]
607 fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
608 let cases: Vec<(AgentMessage, &str)> = vec![
609 (
610 AgentMessage::Text {
611 message_id: "msg_1".to_string(),
612 chunk: "done".to_string(),
613 is_complete: true,
614 model_name: "TestModel".to_string(),
615 },
616 "done",
617 ),
618 (
619 AgentMessage::Thought {
620 message_id: "msg_1".to_string(),
621 chunk: "final reasoning".to_string(),
622 is_complete: true,
623 model_name: "TestModel".to_string(),
624 },
625 "final reasoning",
626 ),
627 ];
628
629 for (message, expected_text) in cases {
630 let session_id = acp::SessionId::new("test-session");
631 assert!(
632 map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
633 "live mode should skip completed chunk"
634 );
635
636 let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
637 .expect("replay notification");
638
639 match notification.update {
640 acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
641 match chunk.content {
642 acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
643 other => panic!("Expected text content, got {other:?}"),
644 }
645 }
646 other => panic!("Expected chunk update, got {other:?}"),
647 }
648 }
649 }
650
651 #[test]
652 fn test_context_cleared_maps_to_agent_notification() {
653 let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
654 .expect("context cleared should emit agent notification");
655 assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
656 }
657
658 #[test]
659 fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
660 let session_id = acp::SessionId::new("test-session");
661
662 let tool_progress = AgentMessage::ToolProgress {
664 request: ToolCallRequest {
665 id: "call_456".to_string(),
666 name: "some_tool".to_string(),
667 arguments: "{}".to_string(),
668 },
669 progress: 50.0,
670 total: None,
671 message: Some("not valid json".to_string()),
672 };
673
674 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
675
676 assert!(notification.is_some());
677
678 let notification = notification.unwrap();
680 match notification.update {
681 acp::SessionUpdate::ToolCallUpdate(update) => {
682 if let Some(content) = &update.fields.content
683 && let acp::ToolCallContent::Content(c) = &content[0]
684 && let acp::ContentBlock::Text(text) = &c.content
685 {
686 assert!(text.text.contains("not valid json"));
688 }
689 }
690 _ => panic!("Expected ToolCallUpdate"),
691 }
692 }
693
694 #[test]
695 fn test_map_acp_stdio_server() {
696 let server = acp::McpServer::Stdio(
697 acp::McpServerStdio::new("my-server", "/usr/bin/server")
698 .args(vec!["--port".into(), "8080".into()])
699 .env(vec![acp::EnvVariable::new("FOO", "bar")]),
700 );
701
702 let configs = map_acp_mcp_servers(vec![server]);
703 assert_eq!(configs.len(), 1);
704
705 match &configs[0] {
706 McpServerConfig::Server(ServerConfig::Stdio { name, command, args, env }) => {
707 assert_eq!(name, "my-server");
708 assert_eq!(command, "/usr/bin/server");
709 assert_eq!(args, &["--port", "8080"]);
710 assert_eq!(env.get("FOO").unwrap(), "bar");
711 }
712 other => panic!("Expected Stdio, got {other:?}"),
713 }
714 }
715
716 #[test]
717 fn test_map_acp_http_server() {
718 let server = acp::McpServer::Http(
719 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
720 .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
721 );
722
723 let configs = map_acp_mcp_servers(vec![server]);
724 assert_eq!(configs.len(), 1);
725
726 match &configs[0] {
727 McpServerConfig::Server(ServerConfig::Http { name, config }) => {
728 assert_eq!(name, "http-server");
729 assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
730 assert_eq!(config.auth_header.as_deref(), Some("token123"));
731 }
732 other => panic!("Expected Http, got {other:?}"),
733 }
734 }
735
736 #[test]
737 fn test_http_auth_header_strips_bearer_case_insensitively() {
738 let cases = [
739 ("Bearer token123", "token123"),
740 ("bearer token123", "token123"),
741 ("BEARER token123", "token123"),
742 ("bEaReR token123", "token123"),
743 ("Token foo", "Token foo"),
746 ("token123", "token123"),
747 ];
748
749 for (input, expected) in cases {
750 let server = acp::McpServer::Http(
751 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
752 .headers(vec![acp::HttpHeader::new("Authorization", input)]),
753 );
754 let configs = map_acp_mcp_servers(vec![server]);
755 match &configs[0] {
756 McpServerConfig::Server(ServerConfig::Http { config, .. }) => {
757 assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
758 }
759 other => panic!("Expected Http, got {other:?}"),
760 }
761 }
762 }
763
764 #[test]
765 fn test_map_acp_sse_server() {
766 let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
767
768 let configs = map_acp_mcp_servers(vec![server]);
769 assert_eq!(configs.len(), 1);
770
771 match &configs[0] {
772 McpServerConfig::Server(ServerConfig::Http { name, config }) => {
773 assert_eq!(name, "sse-server");
774 assert_eq!(config.uri.as_ref(), "https://example.com/sse");
775 assert_eq!(config.auth_header, None);
776 }
777 other => panic!("Expected Http, got {other:?}"),
778 }
779 }
780
781 #[test]
782 fn test_humanize_tool_name() {
783 assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
784 assert_eq!(humanize_tool_name("read_file"), "Read file");
785 assert_eq!(humanize_tool_name("bash"), "Bash");
786 assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
787 }
788
789 #[test]
790 fn test_result_with_result_meta_sets_meta() {
791 use mcp_utils::display_meta::ToolDisplayMeta;
792
793 let session_id = acp::SessionId::new("test-session");
794 let result = ToolCallResult {
795 id: "call_1".to_string(),
796 name: "coding__read_file".to_string(),
797 arguments: "{}".to_string(),
798 result: "file contents".to_string(),
799 };
800 let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
801
802 let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
803 match notification.update {
804 acp::SessionUpdate::ToolCallUpdate(update) => {
805 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
806 let meta = update.meta.expect("meta should be present");
807 assert_eq!(
808 meta.get("display_value").and_then(|v| v.as_str()),
809 Some("Cargo.toml, 156 lines"),
810 "display_value should be a flat key in _meta"
811 );
812 assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
813 }
814 other => panic!("Expected ToolCallUpdate, got {other:?}"),
815 }
816 }
817
818 #[test]
819 fn test_result_without_result_meta() {
820 let session_id = acp::SessionId::new("test-session");
821 let result = ToolCallResult {
822 id: "call_1".to_string(),
823 name: "external__some_tool".to_string(),
824 arguments: "{}".to_string(),
825 result: "ok".to_string(),
826 };
827
828 let notification = map_tool_result_to_notification(session_id, &result, None);
829 match notification.update {
830 acp::SessionUpdate::ToolCallUpdate(update) => {
831 assert!(update.fields.title.is_none());
832 assert!(update.meta.is_none());
833 }
834 other => panic!("Expected ToolCallUpdate, got {other:?}"),
835 }
836 }
837
838 #[test]
839 fn test_plan_notification_extracted_from_result_meta() {
840 use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
841
842 let session_id = acp::SessionId::new("test-session");
843 let meta = ToolResultMeta::with_plan(
844 ToolDisplayMeta::new("Todo", "Research AI agents"),
845 PlanMeta {
846 entries: vec![
847 PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
848 PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
849 ],
850 },
851 );
852
853 let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
854 match notification.update {
855 acp::SessionUpdate::Plan(plan) => {
856 assert_eq!(plan.entries.len(), 2);
857 assert_eq!(plan.entries[0].content, "Research AI agents");
858 assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
859 assert_eq!(plan.entries[1].content, "Write tests");
860 assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
861 }
862 other => panic!("Expected Plan, got {other:?}"),
863 }
864 }
865
866 #[test]
867 fn test_plan_notification_none_when_no_plan_or_no_meta() {
868 use mcp_utils::display_meta::ToolDisplayMeta;
869
870 let sid = acp::SessionId::new("test-session");
871 let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
872 assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
873 assert!(try_extract_plan_notification(sid, None).is_none());
874 }
875
876 #[test]
877 fn test_tool_progress_with_display_meta_emits_meta_update() {
878 use mcp_utils::display_meta::ToolDisplayMeta;
879
880 let session_id = acp::SessionId::new("test-session");
881 let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
882 let serialized = serde_json::to_string(&meta).unwrap();
883
884 let request = ToolCallRequest {
885 id: "call_789".to_string(),
886 name: "coding__read_file".to_string(),
887 arguments: "{}".to_string(),
888 };
889
890 let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
891 .expect("should produce notification");
892
893 match notification.update {
894 acp::SessionUpdate::ToolCallUpdate(update) => {
895 assert_eq!(&*update.tool_call_id.0, "call_789");
896 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
897 let meta_map = update.meta.expect("meta should be present");
898 assert_eq!(
899 meta_map.get("display_value").and_then(|v| v.as_str()),
900 Some("main.rs"),
901 "display_value should be a flat key in _meta"
902 );
903 assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
904 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
905 assert!(update.fields.content.is_none());
907 }
908 other => panic!("Expected ToolCallUpdate, got {other:?}"),
909 }
910 }
911
912 #[tokio::test(flavor = "current_thread")]
913 async fn replay_to_client_forwards_each_event_as_session_notification() {
914 use acp_utils::testing::test_connection;
915 use llm::ContentBlock as LlmContentBlock;
916 use tokio::task::LocalSet;
917
918 LocalSet::new()
919 .run_until(async {
920 let (cx, mut peer) = test_connection().await;
921 let session_id = acp::SessionId::new("test-session");
922 let events = vec![SessionEvent::User(UserEvent::Message {
923 content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
924 })];
925
926 replay_to_client(&events, &cx, &session_id).await;
927
928 for _ in 0..2 {
929 let notif = peer.next_session_notification().await;
930 assert_eq!(notif.session_id, session_id);
931 assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
932 }
933 })
934 .await;
935 }
936}